1 //
2 // Copyright (c) 2008-2017 the Urho3D project.
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 //
22
23 #include "../Precompiled.h"
24
25 #include "../Core/CoreEvents.h"
26 #include "../Core/ProcessUtils.h"
27 #include "../Core/Profiler.h"
28 #include "../Core/WorkQueue.h"
29 #include "../IO/Log.h"
30
31 namespace Urho3D
32 {
33
34 /// Worker thread managed by the work queue.
35 class WorkerThread : public Thread, public RefCounted
36 {
37 public:
38 /// Construct.
WorkerThread(WorkQueue * owner,unsigned index)39 WorkerThread(WorkQueue* owner, unsigned index) :
40 owner_(owner),
41 index_(index)
42 {
43 }
44
45 /// Process work items until stopped.
ThreadFunction()46 virtual void ThreadFunction()
47 {
48 // Init FPU state first
49 InitFPU();
50 owner_->ProcessItems(index_);
51 }
52
53 /// Return thread index.
GetIndex() const54 unsigned GetIndex() const { return index_; }
55
56 private:
57 /// Work queue.
58 WorkQueue* owner_;
59 /// Thread index.
60 unsigned index_;
61 };
62
WorkQueue(Context * context)63 WorkQueue::WorkQueue(Context* context) :
64 Object(context),
65 shutDown_(false),
66 pausing_(false),
67 paused_(false),
68 completing_(false),
69 tolerance_(10),
70 lastSize_(0),
71 maxNonThreadedWorkMs_(5)
72 {
73 SubscribeToEvent(E_BEGINFRAME, URHO3D_HANDLER(WorkQueue, HandleBeginFrame));
74 }
75
~WorkQueue()76 WorkQueue::~WorkQueue()
77 {
78 // Stop the worker threads. First make sure they are not waiting for work items
79 shutDown_ = true;
80 Resume();
81
82 for (unsigned i = 0; i < threads_.Size(); ++i)
83 threads_[i]->Stop();
84 }
85
CreateThreads(unsigned numThreads)86 void WorkQueue::CreateThreads(unsigned numThreads)
87 {
88 #ifdef URHO3D_THREADING
89 // Other subsystems may initialize themselves according to the number of threads.
90 // Therefore allow creating the threads only once, after which the amount is fixed
91 if (!threads_.Empty())
92 return;
93
94 // Start threads in paused mode
95 Pause();
96
97 for (unsigned i = 0; i < numThreads; ++i)
98 {
99 SharedPtr<WorkerThread> thread(new WorkerThread(this, i + 1));
100 thread->Run();
101 threads_.Push(thread);
102 }
103 #else
104 URHO3D_LOGERROR("Can not create worker threads as threading is disabled");
105 #endif
106 }
107
GetFreeItem()108 SharedPtr<WorkItem> WorkQueue::GetFreeItem()
109 {
110 if (poolItems_.Size() > 0)
111 {
112 SharedPtr<WorkItem> item = poolItems_.Front();
113 poolItems_.PopFront();
114 return item;
115 }
116 else
117 {
118 // No usable items found, create a new one set it as pooled and return it.
119 SharedPtr<WorkItem> item(new WorkItem());
120 item->pooled_ = true;
121 return item;
122 }
123 }
124
AddWorkItem(SharedPtr<WorkItem> item)125 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
126 {
127 if (!item)
128 {
129 URHO3D_LOGERROR("Null work item submitted to the work queue");
130 return;
131 }
132
133 // Check for duplicate items.
134 assert(!workItems_.Contains(item));
135
136 // Push to the main thread list to keep item alive
137 // Clear completed flag in case item is reused
138 workItems_.Push(item);
139 item->completed_ = false;
140
141 // Make sure worker threads' list is safe to modify
142 if (threads_.Size() && !paused_)
143 queueMutex_.Acquire();
144
145 // Find position for new item
146 if (queue_.Empty())
147 queue_.Push(item);
148 else
149 {
150 bool inserted = false;
151
152 for (List<WorkItem*>::Iterator i = queue_.Begin(); i != queue_.End(); ++i)
153 {
154 if ((*i)->priority_ <= item->priority_)
155 {
156 queue_.Insert(i, item);
157 inserted = true;
158 break;
159 }
160 }
161
162 if (!inserted)
163 queue_.Push(item);
164 }
165
166 if (threads_.Size())
167 {
168 queueMutex_.Release();
169 paused_ = false;
170 }
171 }
172
RemoveWorkItem(SharedPtr<WorkItem> item)173 bool WorkQueue::RemoveWorkItem(SharedPtr<WorkItem> item)
174 {
175 if (!item)
176 return false;
177
178 MutexLock lock(queueMutex_);
179
180 // Can only remove successfully if the item was not yet taken by threads for execution
181 List<WorkItem*>::Iterator i = queue_.Find(item.Get());
182 if (i != queue_.End())
183 {
184 List<SharedPtr<WorkItem> >::Iterator j = workItems_.Find(item);
185 if (j != workItems_.End())
186 {
187 queue_.Erase(i);
188 ReturnToPool(item);
189 workItems_.Erase(j);
190 return true;
191 }
192 }
193
194 return false;
195 }
196
RemoveWorkItems(const Vector<SharedPtr<WorkItem>> & items)197 unsigned WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem> >& items)
198 {
199 MutexLock lock(queueMutex_);
200 unsigned removed = 0;
201
202 for (Vector<SharedPtr<WorkItem> >::ConstIterator i = items.Begin(); i != items.End(); ++i)
203 {
204 List<WorkItem*>::Iterator j = queue_.Find(i->Get());
205 if (j != queue_.End())
206 {
207 List<SharedPtr<WorkItem> >::Iterator k = workItems_.Find(*i);
208 if (k != workItems_.End())
209 {
210 queue_.Erase(j);
211 ReturnToPool(*k);
212 workItems_.Erase(k);
213 ++removed;
214 }
215 }
216 }
217
218 return removed;
219 }
220
Pause()221 void WorkQueue::Pause()
222 {
223 if (!paused_)
224 {
225 pausing_ = true;
226
227 queueMutex_.Acquire();
228 paused_ = true;
229
230 pausing_ = false;
231 }
232 }
233
Resume()234 void WorkQueue::Resume()
235 {
236 if (paused_)
237 {
238 queueMutex_.Release();
239 paused_ = false;
240 }
241 }
242
243
Complete(unsigned priority)244 void WorkQueue::Complete(unsigned priority)
245 {
246 completing_ = true;
247
248 if (threads_.Size())
249 {
250 Resume();
251
252 // Take work items also in the main thread until queue empty or no high-priority items anymore
253 while (!queue_.Empty())
254 {
255 queueMutex_.Acquire();
256 if (!queue_.Empty() && queue_.Front()->priority_ >= priority)
257 {
258 WorkItem* item = queue_.Front();
259 queue_.PopFront();
260 queueMutex_.Release();
261 item->workFunction_(item, 0);
262 item->completed_ = true;
263 }
264 else
265 {
266 queueMutex_.Release();
267 break;
268 }
269 }
270
271 // Wait for threaded work to complete
272 while (!IsCompleted(priority))
273 {
274 }
275
276 // If no work at all remaining, pause worker threads by leaving the mutex locked
277 if (queue_.Empty())
278 Pause();
279 }
280 else
281 {
282 // No worker threads: ensure all high-priority items are completed in the main thread
283 while (!queue_.Empty() && queue_.Front()->priority_ >= priority)
284 {
285 WorkItem* item = queue_.Front();
286 queue_.PopFront();
287 item->workFunction_(item, 0);
288 item->completed_ = true;
289 }
290 }
291
292 PurgeCompleted(priority);
293 completing_ = false;
294 }
295
IsCompleted(unsigned priority) const296 bool WorkQueue::IsCompleted(unsigned priority) const
297 {
298 for (List<SharedPtr<WorkItem> >::ConstIterator i = workItems_.Begin(); i != workItems_.End(); ++i)
299 {
300 if ((*i)->priority_ >= priority && !(*i)->completed_)
301 return false;
302 }
303
304 return true;
305 }
306
ProcessItems(unsigned threadIndex)307 void WorkQueue::ProcessItems(unsigned threadIndex)
308 {
309 bool wasActive = false;
310
311 for (;;)
312 {
313 if (shutDown_)
314 return;
315
316 if (pausing_ && !wasActive)
317 Time::Sleep(0);
318 else
319 {
320 queueMutex_.Acquire();
321 if (!queue_.Empty())
322 {
323 wasActive = true;
324
325 WorkItem* item = queue_.Front();
326 queue_.PopFront();
327 queueMutex_.Release();
328 item->workFunction_(item, threadIndex);
329 item->completed_ = true;
330 }
331 else
332 {
333 wasActive = false;
334
335 queueMutex_.Release();
336 Time::Sleep(0);
337 }
338 }
339 }
340 }
341
PurgeCompleted(unsigned priority)342 void WorkQueue::PurgeCompleted(unsigned priority)
343 {
344 // Purge completed work items and send completion events. Do not signal items lower than priority threshold,
345 // as those may be user submitted and lead to eg. scene manipulation that could happen in the middle of the
346 // render update, which is not allowed
347 for (List<SharedPtr<WorkItem> >::Iterator i = workItems_.Begin(); i != workItems_.End();)
348 {
349 if ((*i)->completed_ && (*i)->priority_ >= priority)
350 {
351 if ((*i)->sendEvent_)
352 {
353 using namespace WorkItemCompleted;
354
355 VariantMap& eventData = GetEventDataMap();
356 eventData[P_ITEM] = i->Get();
357 SendEvent(E_WORKITEMCOMPLETED, eventData);
358 }
359
360 ReturnToPool(*i);
361 i = workItems_.Erase(i);
362 }
363 else
364 ++i;
365 }
366 }
367
PurgePool()368 void WorkQueue::PurgePool()
369 {
370 unsigned currentSize = poolItems_.Size();
371 int difference = lastSize_ - currentSize;
372
373 // Difference tolerance, should be fairly significant to reduce the pool size.
374 for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < (unsigned)difference; i++)
375 poolItems_.PopFront();
376
377 lastSize_ = currentSize;
378 }
379
ReturnToPool(SharedPtr<WorkItem> & item)380 void WorkQueue::ReturnToPool(SharedPtr<WorkItem>& item)
381 {
382 // Check if this was a pooled item and set it to usable
383 if (item->pooled_)
384 {
385 // Reset the values to their defaults. This should
386 // be safe to do here as the completed event has
387 // already been handled and this is part of the
388 // internal pool.
389 item->start_ = 0;
390 item->end_ = 0;
391 item->aux_ = 0;
392 item->workFunction_ = 0;
393 item->priority_ = M_MAX_UNSIGNED;
394 item->sendEvent_ = false;
395 item->completed_ = false;
396
397 poolItems_.Push(item);
398 }
399 }
400
HandleBeginFrame(StringHash eventType,VariantMap & eventData)401 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
402 {
403 // If no worker threads, complete low-priority work here
404 if (threads_.Empty() && !queue_.Empty())
405 {
406 URHO3D_PROFILE(CompleteWorkNonthreaded);
407
408 HiresTimer timer;
409
410 while (!queue_.Empty() && timer.GetUSec(false) < maxNonThreadedWorkMs_ * 1000)
411 {
412 WorkItem* item = queue_.Front();
413 queue_.PopFront();
414 item->workFunction_(item, 0);
415 item->completed_ = true;
416 }
417 }
418
419 // Complete and signal items down to the lowest priority
420 PurgeCompleted(0);
421 PurgePool();
422 }
423
424 }
425