1 /*
2  *  Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "test/time_controller/simulated_process_thread.h"
12 
13 #include <algorithm>
14 #include <utility>
15 
16 namespace webrtc {
17 namespace {
18 // Helper function to remove from a std container by value.
19 template <class C>
RemoveByValue(C * vec,typename C::value_type val)20 bool RemoveByValue(C* vec, typename C::value_type val) {
21   auto it = std::find(vec->begin(), vec->end(), val);
22   if (it == vec->end())
23     return false;
24   vec->erase(it);
25   return true;
26 }
27 }  // namespace
SimulatedProcessThread(sim_time_impl::SimulatedTimeControllerImpl * handler,absl::string_view name)28 SimulatedProcessThread::SimulatedProcessThread(
29     sim_time_impl::SimulatedTimeControllerImpl* handler,
30     absl::string_view name)
31     : handler_(handler), name_(new char[name.size()]) {
32   std::copy_n(name.begin(), name.size(), name_);
33 }
34 
~SimulatedProcessThread()35 SimulatedProcessThread::~SimulatedProcessThread() {
36   handler_->Unregister(this);
37   delete[] name_;
38 }
39 
RunReady(Timestamp at_time)40 void SimulatedProcessThread::RunReady(Timestamp at_time) {
41   CurrentTaskQueueSetter set_current(this);
42   rtc::CritScope lock(&lock_);
43   std::vector<Module*> ready_modules;
44   for (auto it = delayed_modules_.begin();
45        it != delayed_modules_.end() && it->first <= at_time;
46        it = delayed_modules_.erase(it)) {
47     for (auto module : it->second) {
48       ready_modules.push_back(module);
49     }
50   }
51   for (auto* module : ready_modules) {
52     module->Process();
53     delayed_modules_[GetNextTime(module, at_time)].push_back(module);
54   }
55 
56   for (auto it = delayed_tasks_.begin();
57        it != delayed_tasks_.end() && it->first <= at_time;
58        it = delayed_tasks_.erase(it)) {
59     for (auto& task : it->second) {
60       queue_.push_back(std::move(task));
61     }
62   }
63   while (!queue_.empty()) {
64     std::unique_ptr<QueuedTask> task = std::move(queue_.front());
65     queue_.pop_front();
66     lock_.Leave();
67     bool should_delete = task->Run();
68     RTC_CHECK(should_delete);
69     lock_.Enter();
70   }
71   RTC_DCHECK(queue_.empty());
72   if (!delayed_modules_.empty()) {
73     next_run_time_ = delayed_modules_.begin()->first;
74   } else {
75     next_run_time_ = Timestamp::PlusInfinity();
76   }
77   if (!delayed_tasks_.empty()) {
78     next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
79   }
80 }
Start()81 void SimulatedProcessThread::Start() {
82   std::vector<Module*> starting;
83   {
84     rtc::CritScope lock(&lock_);
85     if (process_thread_running_)
86       return;
87     process_thread_running_ = true;
88     starting.swap(stopped_modules_);
89   }
90   for (auto& module : starting)
91     module->ProcessThreadAttached(this);
92 
93   Timestamp at_time = handler_->CurrentTime();
94   rtc::CritScope lock(&lock_);
95   for (auto& module : starting)
96     delayed_modules_[GetNextTime(module, at_time)].push_back(module);
97 
98   if (!queue_.empty()) {
99     next_run_time_ = Timestamp::MinusInfinity();
100   } else if (!delayed_modules_.empty()) {
101     next_run_time_ = delayed_modules_.begin()->first;
102   } else {
103     next_run_time_ = Timestamp::PlusInfinity();
104   }
105 }
106 
Stop()107 void SimulatedProcessThread::Stop() {
108   std::vector<Module*> stopping;
109   {
110     rtc::CritScope lock(&lock_);
111     process_thread_running_ = false;
112 
113     for (auto& delayed : delayed_modules_) {
114       for (auto mod : delayed.second)
115         stopped_modules_.push_back(mod);
116     }
117     delayed_modules_.clear();
118 
119     stopping = stopped_modules_;
120   }
121   for (auto& module : stopping)
122     module->ProcessThreadAttached(nullptr);
123 }
124 
WakeUp(Module * module)125 void SimulatedProcessThread::WakeUp(Module* module) {
126   rtc::CritScope lock(&lock_);
127   for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
128     if (RemoveByValue(&it->second, module))
129       break;
130   }
131   Timestamp next_time = GetNextTime(module, handler_->CurrentTime());
132   delayed_modules_[next_time].push_back(module);
133   next_run_time_ = std::min(next_run_time_, next_time);
134 }
135 
RegisterModule(Module * module,const rtc::Location & from)136 void SimulatedProcessThread::RegisterModule(Module* module,
137                                             const rtc::Location& from) {
138   module->ProcessThreadAttached(this);
139   rtc::CritScope lock(&lock_);
140   if (!process_thread_running_) {
141     stopped_modules_.push_back(module);
142   } else {
143     Timestamp next_time = GetNextTime(module, handler_->CurrentTime());
144     delayed_modules_[next_time].push_back(module);
145     next_run_time_ = std::min(next_run_time_, next_time);
146   }
147 }
148 
DeRegisterModule(Module * module)149 void SimulatedProcessThread::DeRegisterModule(Module* module) {
150   bool modules_running;
151   {
152     rtc::CritScope lock(&lock_);
153     if (!process_thread_running_) {
154       RemoveByValue(&stopped_modules_, module);
155     } else {
156       for (auto& pair : delayed_modules_) {
157         if (RemoveByValue(&pair.second, module))
158           break;
159       }
160     }
161     modules_running = process_thread_running_;
162   }
163   if (modules_running)
164     module->ProcessThreadAttached(nullptr);
165 }
166 
PostTask(std::unique_ptr<QueuedTask> task)167 void SimulatedProcessThread::PostTask(std::unique_ptr<QueuedTask> task) {
168   rtc::CritScope lock(&lock_);
169   queue_.emplace_back(std::move(task));
170   next_run_time_ = Timestamp::MinusInfinity();
171 }
172 
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)173 void SimulatedProcessThread::PostDelayedTask(std::unique_ptr<QueuedTask> task,
174                                              uint32_t milliseconds) {
175   rtc::CritScope lock(&lock_);
176   Timestamp target_time =
177       handler_->CurrentTime() + TimeDelta::Millis(milliseconds);
178   delayed_tasks_[target_time].push_back(std::move(task));
179   next_run_time_ = std::min(next_run_time_, target_time);
180 }
181 
GetNextTime(Module * module,Timestamp at_time)182 Timestamp SimulatedProcessThread::GetNextTime(Module* module,
183                                               Timestamp at_time) {
184   CurrentTaskQueueSetter set_current(this);
185   return at_time + TimeDelta::Millis(module->TimeUntilNextProcess());
186 }
187 
188 }  // namespace webrtc
189