1 //
2 // Copyright (c) 2008-2017 the Urho3D project.
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 //
22 
23 #include "../Precompiled.h"
24 
25 #include "../Core/CoreEvents.h"
26 #include "../Core/ProcessUtils.h"
27 #include "../Core/Profiler.h"
28 #include "../Core/WorkQueue.h"
29 #include "../IO/Log.h"
30 
31 namespace Urho3D
32 {
33 
34 /// Worker thread managed by the work queue.
35 class WorkerThread : public Thread, public RefCounted
36 {
37 public:
38     /// Construct.
WorkerThread(WorkQueue * owner,unsigned index)39     WorkerThread(WorkQueue* owner, unsigned index) :
40         owner_(owner),
41         index_(index)
42     {
43     }
44 
45     /// Process work items until stopped.
ThreadFunction()46     virtual void ThreadFunction()
47     {
48         // Init FPU state first
49         InitFPU();
50         owner_->ProcessItems(index_);
51     }
52 
53     /// Return thread index.
GetIndex() const54     unsigned GetIndex() const { return index_; }
55 
56 private:
57     /// Work queue.
58     WorkQueue* owner_;
59     /// Thread index.
60     unsigned index_;
61 };
62 
WorkQueue(Context * context)63 WorkQueue::WorkQueue(Context* context) :
64     Object(context),
65     shutDown_(false),
66     pausing_(false),
67     paused_(false),
68     completing_(false),
69     tolerance_(10),
70     lastSize_(0),
71     maxNonThreadedWorkMs_(5)
72 {
73     SubscribeToEvent(E_BEGINFRAME, URHO3D_HANDLER(WorkQueue, HandleBeginFrame));
74 }
75 
~WorkQueue()76 WorkQueue::~WorkQueue()
77 {
78     // Stop the worker threads. First make sure they are not waiting for work items
79     shutDown_ = true;
80     Resume();
81 
82     for (unsigned i = 0; i < threads_.Size(); ++i)
83         threads_[i]->Stop();
84 }
85 
CreateThreads(unsigned numThreads)86 void WorkQueue::CreateThreads(unsigned numThreads)
87 {
88 #ifdef URHO3D_THREADING
89     // Other subsystems may initialize themselves according to the number of threads.
90     // Therefore allow creating the threads only once, after which the amount is fixed
91     if (!threads_.Empty())
92         return;
93 
94     // Start threads in paused mode
95     Pause();
96 
97     for (unsigned i = 0; i < numThreads; ++i)
98     {
99         SharedPtr<WorkerThread> thread(new WorkerThread(this, i + 1));
100         thread->Run();
101         threads_.Push(thread);
102     }
103 #else
104     URHO3D_LOGERROR("Can not create worker threads as threading is disabled");
105 #endif
106 }
107 
GetFreeItem()108 SharedPtr<WorkItem> WorkQueue::GetFreeItem()
109 {
110     if (poolItems_.Size() > 0)
111     {
112         SharedPtr<WorkItem> item = poolItems_.Front();
113         poolItems_.PopFront();
114         return item;
115     }
116     else
117     {
118         // No usable items found, create a new one set it as pooled and return it.
119         SharedPtr<WorkItem> item(new WorkItem());
120         item->pooled_ = true;
121         return item;
122     }
123 }
124 
AddWorkItem(SharedPtr<WorkItem> item)125 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
126 {
127     if (!item)
128     {
129         URHO3D_LOGERROR("Null work item submitted to the work queue");
130         return;
131     }
132 
133     // Check for duplicate items.
134     assert(!workItems_.Contains(item));
135 
136     // Push to the main thread list to keep item alive
137     // Clear completed flag in case item is reused
138     workItems_.Push(item);
139     item->completed_ = false;
140 
141     // Make sure worker threads' list is safe to modify
142     if (threads_.Size() && !paused_)
143         queueMutex_.Acquire();
144 
145     // Find position for new item
146     if (queue_.Empty())
147         queue_.Push(item);
148     else
149     {
150         bool inserted = false;
151 
152         for (List<WorkItem*>::Iterator i = queue_.Begin(); i != queue_.End(); ++i)
153         {
154             if ((*i)->priority_ <= item->priority_)
155             {
156                 queue_.Insert(i, item);
157                 inserted = true;
158                 break;
159             }
160         }
161 
162         if (!inserted)
163             queue_.Push(item);
164     }
165 
166     if (threads_.Size())
167     {
168         queueMutex_.Release();
169         paused_ = false;
170     }
171 }
172 
RemoveWorkItem(SharedPtr<WorkItem> item)173 bool WorkQueue::RemoveWorkItem(SharedPtr<WorkItem> item)
174 {
175     if (!item)
176         return false;
177 
178     MutexLock lock(queueMutex_);
179 
180     // Can only remove successfully if the item was not yet taken by threads for execution
181     List<WorkItem*>::Iterator i = queue_.Find(item.Get());
182     if (i != queue_.End())
183     {
184         List<SharedPtr<WorkItem> >::Iterator j = workItems_.Find(item);
185         if (j != workItems_.End())
186         {
187             queue_.Erase(i);
188             ReturnToPool(item);
189             workItems_.Erase(j);
190             return true;
191         }
192     }
193 
194     return false;
195 }
196 
RemoveWorkItems(const Vector<SharedPtr<WorkItem>> & items)197 unsigned WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem> >& items)
198 {
199     MutexLock lock(queueMutex_);
200     unsigned removed = 0;
201 
202     for (Vector<SharedPtr<WorkItem> >::ConstIterator i = items.Begin(); i != items.End(); ++i)
203     {
204         List<WorkItem*>::Iterator j = queue_.Find(i->Get());
205         if (j != queue_.End())
206         {
207             List<SharedPtr<WorkItem> >::Iterator k = workItems_.Find(*i);
208             if (k != workItems_.End())
209             {
210                 queue_.Erase(j);
211                 ReturnToPool(*k);
212                 workItems_.Erase(k);
213                 ++removed;
214             }
215         }
216     }
217 
218     return removed;
219 }
220 
Pause()221 void WorkQueue::Pause()
222 {
223     if (!paused_)
224     {
225         pausing_ = true;
226 
227         queueMutex_.Acquire();
228         paused_ = true;
229 
230         pausing_ = false;
231     }
232 }
233 
Resume()234 void WorkQueue::Resume()
235 {
236     if (paused_)
237     {
238         queueMutex_.Release();
239         paused_ = false;
240     }
241 }
242 
243 
Complete(unsigned priority)244 void WorkQueue::Complete(unsigned priority)
245 {
246     completing_ = true;
247 
248     if (threads_.Size())
249     {
250         Resume();
251 
252         // Take work items also in the main thread until queue empty or no high-priority items anymore
253         while (!queue_.Empty())
254         {
255             queueMutex_.Acquire();
256             if (!queue_.Empty() && queue_.Front()->priority_ >= priority)
257             {
258                 WorkItem* item = queue_.Front();
259                 queue_.PopFront();
260                 queueMutex_.Release();
261                 item->workFunction_(item, 0);
262                 item->completed_ = true;
263             }
264             else
265             {
266                 queueMutex_.Release();
267                 break;
268             }
269         }
270 
271         // Wait for threaded work to complete
272         while (!IsCompleted(priority))
273         {
274         }
275 
276         // If no work at all remaining, pause worker threads by leaving the mutex locked
277         if (queue_.Empty())
278             Pause();
279     }
280     else
281     {
282         // No worker threads: ensure all high-priority items are completed in the main thread
283         while (!queue_.Empty() && queue_.Front()->priority_ >= priority)
284         {
285             WorkItem* item = queue_.Front();
286             queue_.PopFront();
287             item->workFunction_(item, 0);
288             item->completed_ = true;
289         }
290     }
291 
292     PurgeCompleted(priority);
293     completing_ = false;
294 }
295 
IsCompleted(unsigned priority) const296 bool WorkQueue::IsCompleted(unsigned priority) const
297 {
298     for (List<SharedPtr<WorkItem> >::ConstIterator i = workItems_.Begin(); i != workItems_.End(); ++i)
299     {
300         if ((*i)->priority_ >= priority && !(*i)->completed_)
301             return false;
302     }
303 
304     return true;
305 }
306 
ProcessItems(unsigned threadIndex)307 void WorkQueue::ProcessItems(unsigned threadIndex)
308 {
309     bool wasActive = false;
310 
311     for (;;)
312     {
313         if (shutDown_)
314             return;
315 
316         if (pausing_ && !wasActive)
317             Time::Sleep(0);
318         else
319         {
320             queueMutex_.Acquire();
321             if (!queue_.Empty())
322             {
323                 wasActive = true;
324 
325                 WorkItem* item = queue_.Front();
326                 queue_.PopFront();
327                 queueMutex_.Release();
328                 item->workFunction_(item, threadIndex);
329                 item->completed_ = true;
330             }
331             else
332             {
333                 wasActive = false;
334 
335                 queueMutex_.Release();
336                 Time::Sleep(0);
337             }
338         }
339     }
340 }
341 
PurgeCompleted(unsigned priority)342 void WorkQueue::PurgeCompleted(unsigned priority)
343 {
344     // Purge completed work items and send completion events. Do not signal items lower than priority threshold,
345     // as those may be user submitted and lead to eg. scene manipulation that could happen in the middle of the
346     // render update, which is not allowed
347     for (List<SharedPtr<WorkItem> >::Iterator i = workItems_.Begin(); i != workItems_.End();)
348     {
349         if ((*i)->completed_ && (*i)->priority_ >= priority)
350         {
351             if ((*i)->sendEvent_)
352             {
353                 using namespace WorkItemCompleted;
354 
355                 VariantMap& eventData = GetEventDataMap();
356                 eventData[P_ITEM] = i->Get();
357                 SendEvent(E_WORKITEMCOMPLETED, eventData);
358             }
359 
360             ReturnToPool(*i);
361             i = workItems_.Erase(i);
362         }
363         else
364             ++i;
365     }
366 }
367 
PurgePool()368 void WorkQueue::PurgePool()
369 {
370     unsigned currentSize = poolItems_.Size();
371     int difference = lastSize_ - currentSize;
372 
373     // Difference tolerance, should be fairly significant to reduce the pool size.
374     for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < (unsigned)difference; i++)
375         poolItems_.PopFront();
376 
377     lastSize_ = currentSize;
378 }
379 
ReturnToPool(SharedPtr<WorkItem> & item)380 void WorkQueue::ReturnToPool(SharedPtr<WorkItem>& item)
381 {
382     // Check if this was a pooled item and set it to usable
383     if (item->pooled_)
384     {
385         // Reset the values to their defaults. This should
386         // be safe to do here as the completed event has
387         // already been handled and this is part of the
388         // internal pool.
389         item->start_ = 0;
390         item->end_ = 0;
391         item->aux_ = 0;
392         item->workFunction_ = 0;
393         item->priority_ = M_MAX_UNSIGNED;
394         item->sendEvent_ = false;
395         item->completed_ = false;
396 
397         poolItems_.Push(item);
398     }
399 }
400 
HandleBeginFrame(StringHash eventType,VariantMap & eventData)401 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
402 {
403     // If no worker threads, complete low-priority work here
404     if (threads_.Empty() && !queue_.Empty())
405     {
406         URHO3D_PROFILE(CompleteWorkNonthreaded);
407 
408         HiresTimer timer;
409 
410         while (!queue_.Empty() && timer.GetUSec(false) < maxNonThreadedWorkMs_ * 1000)
411         {
412             WorkItem* item = queue_.Front();
413             queue_.PopFront();
414             item->workFunction_(item, 0);
415             item->completed_ = true;
416         }
417     }
418 
419     // Complete and signal items down to the lowest priority
420     PurgeCompleted(0);
421     PurgePool();
422 }
423 
424 }
425