1 /*
2 -----------------------------------------------------------------------------
3 This source file is part of OGRE
4 (Object-oriented Graphics Rendering Engine)
5 For the latest info, see http://www.ogre3d.org/
6 
7 Copyright (c) 2000-2013 Torus Knot Software Ltd
8 
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15 
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18 
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 THE SOFTWARE.
26 -----------------------------------------------------------------------------
27 */
28 #include "OgreStableHeaders.h"
29 #include "OgreWorkQueue.h"
30 #include "OgreLogManager.h"
31 #include "OgreRoot.h"
32 #include "OgreRenderSystem.h"
33 
34 namespace Ogre {
35 	//---------------------------------------------------------------------
getChannel(const String & channelName)36 	uint16 WorkQueue::getChannel(const String& channelName)
37 	{
38             OGRE_LOCK_MUTEX(mChannelMapMutex);
39 
40 		ChannelMap::iterator i = mChannelMap.find(channelName);
41 		if (i == mChannelMap.end())
42 		{
43 			i = mChannelMap.insert(ChannelMap::value_type(channelName, mNextChannel++)).first;
44 		}
45 		return i->second;
46 	}
47 	//---------------------------------------------------------------------
Request(uint16 channel,uint16 rtype,const Any & rData,uint8 retry,RequestID rid)48 	WorkQueue::Request::Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid)
49 		: mChannel(channel), mType(rtype), mData(rData), mRetryCount(retry), mID(rid), mAborted(false)
50 	{
51 
52 	}
53 	//---------------------------------------------------------------------
~Request()54 	WorkQueue::Request::~Request()
55 	{
56 
57 	}
58 	//---------------------------------------------------------------------
59 	//---------------------------------------------------------------------
Response(const Request * rq,bool success,const Any & data,const String & msg)60 	WorkQueue::Response::Response(const Request* rq, bool success, const Any& data, const String& msg)
61 		: mRequest(rq), mSuccess(success), mMessages(msg), mData(data)
62 	{
63 
64 	}
65 	//---------------------------------------------------------------------
~Response()66 	WorkQueue::Response::~Response()
67 	{
68 		OGRE_DELETE mRequest;
69 	}
70 	//---------------------------------------------------------------------
71 	//---------------------------------------------------------------------
DefaultWorkQueueBase(const String & name)72 	DefaultWorkQueueBase::DefaultWorkQueueBase(const String& name)
73 		: mName(name)
74 		, mWorkerThreadCount(1)
75 		, mWorkerRenderSystemAccess(false)
76 		, mIsRunning(false)
77 		, mResposeTimeLimitMS(8)
78 		, mWorkerFunc(0)
79 		, mRequestCount(0)
80 		, mPaused(false)
81 		, mAcceptRequests(true)
82         , mShuttingDown(false)
83         , mIdleProcessed(0)
84 	{
85 	}
86 	//---------------------------------------------------------------------
getName() const87 	const String& DefaultWorkQueueBase::getName() const
88 	{
89 		return mName;
90 	}
91 	//---------------------------------------------------------------------
getWorkerThreadCount() const92 	size_t DefaultWorkQueueBase::getWorkerThreadCount() const
93 	{
94 		return mWorkerThreadCount;
95 	}
96 	//---------------------------------------------------------------------
setWorkerThreadCount(size_t c)97 	void DefaultWorkQueueBase::setWorkerThreadCount(size_t c)
98 	{
99 		mWorkerThreadCount = c;
100 	}
101 	//---------------------------------------------------------------------
getWorkersCanAccessRenderSystem() const102 	bool DefaultWorkQueueBase::getWorkersCanAccessRenderSystem() const
103 	{
104 		return mWorkerRenderSystemAccess;
105 	}
106 	//---------------------------------------------------------------------
setWorkersCanAccessRenderSystem(bool access)107 	void DefaultWorkQueueBase::setWorkersCanAccessRenderSystem(bool access)
108 	{
109 		mWorkerRenderSystemAccess = access;
110 	}
111 	//---------------------------------------------------------------------
~DefaultWorkQueueBase()112 	DefaultWorkQueueBase::~DefaultWorkQueueBase()
113 	{
114 		//shutdown(); // can't call here; abstract function
115 
116 		for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
117 		{
118 			OGRE_DELETE (*i);
119 		}
120 		mRequestQueue.clear();
121 
122 		for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
123 		{
124 			OGRE_DELETE (*i);
125 		}
126 		mResponseQueue.clear();
127 	}
128 	//---------------------------------------------------------------------
addRequestHandler(uint16 channel,RequestHandler * rh)129 	void DefaultWorkQueueBase::addRequestHandler(uint16 channel, RequestHandler* rh)
130 	{
131             OGRE_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex);
132 
133 		RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel);
134 		if (i == mRequestHandlers.end())
135 			i = mRequestHandlers.insert(RequestHandlerListByChannel::value_type(channel, RequestHandlerList())).first;
136 
137 		RequestHandlerList& handlers = i->second;
138 		bool duplicate = false;
139 		for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j)
140 		{
141 			if ((*j)->getHandler() == rh)
142 			{
143 				duplicate = true;
144 				break;
145 			}
146 		}
147 		if (!duplicate)
148 			handlers.push_back(RequestHandlerHolderPtr(OGRE_NEW RequestHandlerHolder(rh)));
149 
150 	}
151 	//---------------------------------------------------------------------
removeRequestHandler(uint16 channel,RequestHandler * rh)152 	void DefaultWorkQueueBase::removeRequestHandler(uint16 channel, RequestHandler* rh)
153 	{
154 		OGRE_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex);
155 
156 		RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel);
157 		if (i != mRequestHandlers.end())
158 		{
159 			RequestHandlerList& handlers = i->second;
160 			for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j)
161 			{
162 				if ((*j)->getHandler() == rh)
163 				{
164 					// Disconnect - this will make it safe across copies of the list
165 					// this is threadsafe and will wait for existing processes to finish
166 					(*j)->disconnectHandler();
167 					handlers.erase(j);
168 					break;
169 				}
170 			}
171 
172 		}
173 
174 	}
175 	//---------------------------------------------------------------------
addResponseHandler(uint16 channel,ResponseHandler * rh)176 	void DefaultWorkQueueBase::addResponseHandler(uint16 channel, ResponseHandler* rh)
177 	{
178 		ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel);
179 		if (i == mResponseHandlers.end())
180 			i = mResponseHandlers.insert(ResponseHandlerListByChannel::value_type(channel, ResponseHandlerList())).first;
181 
182 		ResponseHandlerList& handlers = i->second;
183 		if (std::find(handlers.begin(), handlers.end(), rh) == handlers.end())
184 			handlers.push_back(rh);
185 	}
186 	//---------------------------------------------------------------------
removeResponseHandler(uint16 channel,ResponseHandler * rh)187 	void DefaultWorkQueueBase::removeResponseHandler(uint16 channel, ResponseHandler* rh)
188 	{
189 		ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel);
190 		if (i != mResponseHandlers.end())
191 		{
192 			ResponseHandlerList& handlers = i->second;
193 			ResponseHandlerList::iterator j = std::find(
194 				handlers.begin(), handlers.end(), rh);
195 			if (j != handlers.end())
196 				handlers.erase(j);
197 
198 		}
199 	}
200 	//---------------------------------------------------------------------
addRequest(uint16 channel,uint16 requestType,const Any & rData,uint8 retryCount,bool forceSynchronous,bool idleThread)201 	WorkQueue::RequestID DefaultWorkQueueBase::addRequest(uint16 channel, uint16 requestType,
202 		const Any& rData, uint8 retryCount, bool forceSynchronous, bool idleThread)
203 	{
204 		Request* req = 0;
205 		RequestID rid = 0;
206 
207 		{
208 			// lock to acquire rid and push request to the queue
209                     OGRE_LOCK_MUTEX(mRequestMutex);
210 
211 			if (!mAcceptRequests || mShuttingDown)
212 				return 0;
213 
214 			rid = ++mRequestCount;
215 			req = OGRE_NEW Request(channel, requestType, rData, retryCount, rid);
216 
217 			LogManager::getSingleton().stream(LML_TRIVIAL) <<
218 				"DefaultWorkQueueBase('" << mName << "') - QUEUED(thread:" <<
219 #if OGRE_THREAD_SUPPORT
220 				OGRE_THREAD_CURRENT_ID
221 #else
222 				"main"
223 #endif
224 				<< "): ID=" << rid
225 			    << " channel=" << channel << " requestType=" << requestType;
226 #if OGRE_THREAD_SUPPORT
227 			if (!forceSynchronous&& !idleThread)
228 			{
229 				mRequestQueue.push_back(req);
230 				notifyWorkers();
231 				return rid;
232 			}
233 #endif
234 		}
235 		if(idleThread){
236 			OGRE_LOCK_MUTEX(mIdleMutex);
237 			mIdleRequestQueue.push_back(req);
238 			if(!mIdleThreadRunning)
239 			{
240 				notifyWorkers();
241 			}
242 		} else { //forceSynchronous
243 			processRequestResponse(req, true);
244 		}
245 		return rid;
246 
247 	}
248 	//---------------------------------------------------------------------
addRequestWithRID(WorkQueue::RequestID rid,uint16 channel,uint16 requestType,const Any & rData,uint8 retryCount)249 	void DefaultWorkQueueBase::addRequestWithRID(WorkQueue::RequestID rid, uint16 channel,
250 		uint16 requestType, const Any& rData, uint8 retryCount)
251 	{
252 		// lock to push request to the queue
253             OGRE_LOCK_MUTEX(mRequestMutex);
254 
255 		if (mShuttingDown)
256 			return;
257 
258 		Request* req = OGRE_NEW Request(channel, requestType, rData, retryCount, rid);
259 
260 		LogManager::getSingleton().stream(LML_TRIVIAL) <<
261 			"DefaultWorkQueueBase('" << mName << "') - REQUEUED(thread:" <<
262 #if OGRE_THREAD_SUPPORT
263 			OGRE_THREAD_CURRENT_ID
264 #else
265 			"main"
266 #endif
267 			<< "): ID=" << rid
268 				   << " channel=" << channel << " requestType=" << requestType;
269 #if OGRE_THREAD_SUPPORT
270 		mRequestQueue.push_back(req);
271 		notifyWorkers();
272 #else
273 		processRequestResponse(req, true);
274 #endif
275 	}
276 	//---------------------------------------------------------------------
abortRequest(RequestID id)277 	void DefaultWorkQueueBase::abortRequest(RequestID id)
278 	{
279             OGRE_LOCK_MUTEX(mProcessMutex);
280 
281 		// NOTE: Pending requests are exist any of RequestQueue, ProcessQueue and
282 		// ResponseQueue when keeping ProcessMutex, so we check all of these queues.
283 
284 		for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
285 		{
286 			if ((*i)->getID() == id)
287 			{
288 				(*i)->abortRequest();
289 				break;
290 			}
291 		}
292 
293 		{
294                     OGRE_LOCK_MUTEX(mRequestMutex);
295 
296 			for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
297 			{
298 				if ((*i)->getID() == id)
299 				{
300 					(*i)->abortRequest();
301 					break;
302 				}
303 			}
304 		}
305 
306 		{
307 			if(mIdleProcessed)
308 			{
309 				mIdleProcessed->abortRequest();
310 			}
311 
312 			OGRE_LOCK_MUTEX(mIdleMutex);
313 			for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i)
314 			{
315 				(*i)->abortRequest();
316 			}
317 		}
318 
319 		{
320                     OGRE_LOCK_MUTEX(mResponseMutex);
321 
322 			for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
323 			{
324 				if( (*i)->getRequest()->getID() == id )
325 				{
326 					(*i)->abortRequest();
327 					break;
328 				}
329 			}
330 		}
331 	}
332 	//---------------------------------------------------------------------
abortRequestsByChannel(uint16 channel)333 	void DefaultWorkQueueBase::abortRequestsByChannel(uint16 channel)
334 	{
335             OGRE_LOCK_MUTEX(mProcessMutex);
336 
337 		for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
338 		{
339 			if ((*i)->getChannel() == channel)
340 			{
341 				(*i)->abortRequest();
342 			}
343 		}
344 
345 		{
346                     OGRE_LOCK_MUTEX(mRequestMutex);
347 
348 			for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
349 			{
350 				if ((*i)->getChannel() == channel)
351 				{
352 					(*i)->abortRequest();
353 				}
354 			}
355 		}
356 		{
357 			if (mIdleProcessed && mIdleProcessed->getChannel() == channel)
358 			{
359 				mIdleProcessed->abortRequest();
360 			}
361 
362 			OGRE_LOCK_MUTEX(mIdleMutex);
363 
364 			for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i)
365 			{
366 				if ((*i)->getChannel() == channel)
367 				{
368 					(*i)->abortRequest();
369 				}
370 			}
371 		}
372 
373 		{
374                     OGRE_LOCK_MUTEX(mResponseMutex);
375 
376 			for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
377 			{
378 				if( (*i)->getRequest()->getChannel() == channel )
379 				{
380 					(*i)->abortRequest();
381 				}
382 			}
383 		}
384 	}
385 	//---------------------------------------------------------------------
abortPendingRequestsByChannel(uint16 channel)386 	void DefaultWorkQueueBase::abortPendingRequestsByChannel(uint16 channel)
387 	{
388 		{
389                     OGRE_LOCK_MUTEX(mRequestMutex);
390 			for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
391 			{
392 				if ((*i)->getChannel() == channel)
393 				{
394 					(*i)->abortRequest();
395 				}
396 			}
397 		}
398 		{
399                     OGRE_LOCK_MUTEX(mIdleMutex);
400 
401 				for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i)
402 				{
403 					if ((*i)->getChannel() == channel)
404 					{
405 						(*i)->abortRequest();
406 					}
407 				}
408 		}
409 	}
410 	//---------------------------------------------------------------------
abortAllRequests()411 	void DefaultWorkQueueBase::abortAllRequests()
412 	{
413             OGRE_LOCK_MUTEX(mProcessMutex);
414 		{
415 			for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
416 			{
417 				(*i)->abortRequest();
418 			}
419 		}
420 
421 
422 		{
423                     OGRE_LOCK_MUTEX(mRequestMutex);
424 
425 			for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
426 			{
427 				(*i)->abortRequest();
428 			}
429 		}
430 
431 		{
432 
433 			if(mIdleProcessed)
434 			{
435 				mIdleProcessed->abortRequest();
436 			}
437 
438 			OGRE_LOCK_MUTEX(mIdleMutex);
439 
440 			for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i)
441 			{
442 				(*i)->abortRequest();
443 			}
444 		}
445 
446 		{
447                     OGRE_LOCK_MUTEX(mResponseMutex);
448 
449 			for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
450 			{
451 				(*i)->abortRequest();
452 			}
453 		}
454 
455 	}
456 	//---------------------------------------------------------------------
setPaused(bool pause)457 	void DefaultWorkQueueBase::setPaused(bool pause)
458 	{
459             OGRE_LOCK_MUTEX(mRequestMutex);
460 
461 		mPaused = pause;
462 	}
463 	//---------------------------------------------------------------------
isPaused() const464 	bool DefaultWorkQueueBase::isPaused() const
465 	{
466 		return mPaused;
467 	}
468 	//---------------------------------------------------------------------
setRequestsAccepted(bool accept)469 	void DefaultWorkQueueBase::setRequestsAccepted(bool accept)
470 	{
471             OGRE_LOCK_MUTEX(mRequestMutex);
472 
473 		mAcceptRequests = accept;
474 	}
475 	//---------------------------------------------------------------------
getRequestsAccepted() const476 	bool DefaultWorkQueueBase::getRequestsAccepted() const
477 	{
478 		return mAcceptRequests;
479 	}
480 	//---------------------------------------------------------------------
_processNextRequest()481 	void DefaultWorkQueueBase::_processNextRequest()
482 	{
483 		if(processIdleRequests()){
484 			// Found idle requests.
485 			return;
486 		}
487 		Request* request = 0;
488 		{
489 			// scoped to only lock while retrieving the next request
490                     OGRE_LOCK_MUTEX(mProcessMutex);
491 			{
492                             OGRE_LOCK_MUTEX(mRequestMutex);
493 
494 				if (!mRequestQueue.empty())
495 				{
496 					request = mRequestQueue.front();
497 					mRequestQueue.pop_front();
498 					mProcessQueue.push_back( request );
499 				}
500 			}
501 		}
502 
503 		if (request)
504 		{
505 			processRequestResponse(request, false);
506 		}
507 
508 
509 	}
510 	//---------------------------------------------------------------------
processRequestResponse(Request * r,bool synchronous)511 	void DefaultWorkQueueBase::processRequestResponse(Request* r, bool synchronous)
512 	{
513 		Response* response = processRequest(r);
514 
515 		OGRE_LOCK_MUTEX(mProcessMutex);
516 
517 		RequestQueue::iterator it;
518 		for( it = mProcessQueue.begin(); it != mProcessQueue.end(); ++it )
519 		{
520 			if( (*it) == r )
521 			{
522 				mProcessQueue.erase( it );
523 				break;
524 			}
525 		}
526 		if( mIdleProcessed == r )
527 		{
528 			mIdleProcessed = 0;
529 		}
530 		if (response)
531 		{
532 			if (!response->succeeded())
533 			{
534 				// Failed, should we retry?
535 				const Request* req = response->getRequest();
536 				if (req->getRetryCount())
537 				{
538 					addRequestWithRID(req->getID(), req->getChannel(), req->getType(), req->getData(),
539 						req->getRetryCount() - 1);
540 					// discard response (this also deletes request)
541 					OGRE_DELETE response;
542 					return;
543 				}
544 			}
545 			if (synchronous)
546 			{
547 				processResponse(response);
548 				OGRE_DELETE response;
549 			}
550 			else
551 			{
552 				if( response->getRequest()->getAborted() )
553 				{
554 					// destroy response user data
555 					response->abortRequest();
556 				}
557 				// Queue response
558 				OGRE_LOCK_MUTEX(mResponseMutex);
559 				mResponseQueue.push_back(response);
560 				// no need to wake thread, this is processed by the main thread
561 			}
562 
563 		}
564 		else
565 		{
566 			// no response, delete request
567 			LogManager::getSingleton().stream() <<
568 				"DefaultWorkQueueBase('" << mName << "') warning: no handler processed request "
569 				<< r->getID() << ", channel " << r->getChannel()
570 				<< ", type " << r->getType();
571 			OGRE_DELETE r;
572 		}
573 
574 	}
575 	//---------------------------------------------------------------------
processResponses()576 	void DefaultWorkQueueBase::processResponses()
577 	{
578 		unsigned long msStart = Root::getSingleton().getTimer()->getMilliseconds();
579 		unsigned long msCurrent = 0;
580 
581 		// keep going until we run out of responses or out of time
582 		while(true)
583 		{
584 			Response* response = 0;
585 			{
586                             OGRE_LOCK_MUTEX(mResponseMutex);
587 
588 				if (mResponseQueue.empty())
589 					break; // exit loop
590 				else
591 				{
592 					response = mResponseQueue.front();
593 					mResponseQueue.pop_front();
594 				}
595 			}
596 
597 			if (response)
598 			{
599 				processResponse(response);
600 
601 				OGRE_DELETE response;
602 
603 			}
604 
605 			// time limit
606 			if (mResposeTimeLimitMS)
607 			{
608 				msCurrent = Root::getSingleton().getTimer()->getMilliseconds();
609 				if (msCurrent - msStart > mResposeTimeLimitMS)
610 					break;
611 			}
612 		}
613 	}
614 	//---------------------------------------------------------------------
processRequest(Request * r)615 	WorkQueue::Response* DefaultWorkQueueBase::processRequest(Request* r)
616 	{
617 		RequestHandlerListByChannel handlerListCopy;
618 		{
619 			// lock the list only to make a copy of it, to maximise parallelism
620                     OGRE_LOCK_RW_MUTEX_READ(mRequestHandlerMutex);
621 
622 			handlerListCopy = mRequestHandlers;
623 
624 		}
625 
626 		Response* response = 0;
627 
628 		StringUtil::StrStreamType dbgMsg;
629 		dbgMsg <<
630 #if OGRE_THREAD_SUPPORT
631 			OGRE_THREAD_CURRENT_ID
632 #else
633 			"main"
634 #endif
635 			<< "): ID=" << r->getID() << " channel=" << r->getChannel()
636 			<< " requestType=" << r->getType();
637 
638 		LogManager::getSingleton().stream(LML_TRIVIAL) <<
639 			"DefaultWorkQueueBase('" << mName << "') - PROCESS_REQUEST_START(" << dbgMsg.str();
640 
641 		RequestHandlerListByChannel::iterator i = handlerListCopy.find(r->getChannel());
642 		if (i != handlerListCopy.end())
643 		{
644 			RequestHandlerList& handlers = i->second;
645 			for (RequestHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j)
646 			{
647 				// threadsafe call which tests canHandleRequest and calls it if so
648 				response = (*j)->handleRequest(r, this);
649 
650 				if (response)
651 					break;
652 			}
653 		}
654 
655 		LogManager::getSingleton().stream(LML_TRIVIAL) <<
656 			"DefaultWorkQueueBase('" << mName << "') - PROCESS_REQUEST_END(" << dbgMsg.str()
657 			<< " processed=" << (response!=0);
658 
659 		return response;
660 
661 	}
662 	//---------------------------------------------------------------------
processResponse(Response * r)663 	void DefaultWorkQueueBase::processResponse(Response* r)
664 	{
665 		StringUtil::StrStreamType dbgMsg;
666 		dbgMsg << "thread:" <<
667 #if OGRE_THREAD_SUPPORT
668 			OGRE_THREAD_CURRENT_ID
669 #else
670 			"main"
671 #endif
672 			<< "): ID=" << r->getRequest()->getID()
673 			<< " success=" << r->succeeded() << " messages=[" << r->getMessages() << "] channel="
674 			<< r->getRequest()->getChannel() << " requestType=" << r->getRequest()->getType();
675 
676 		LogManager::getSingleton().stream(LML_TRIVIAL) <<
677 			"DefaultWorkQueueBase('" << mName << "') - PROCESS_RESPONSE_START(" << dbgMsg.str();
678 
679 		ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(r->getRequest()->getChannel());
680 		if (i != mResponseHandlers.end())
681 		{
682 			ResponseHandlerList& handlers = i->second;
683 			for (ResponseHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j)
684 			{
685 				if ((*j)->canHandleResponse(r, this))
686 				{
687 					(*j)->handleResponse(r, this);
688 				}
689 			}
690 		}
691 		LogManager::getSingleton().stream(LML_TRIVIAL) <<
692 			"DefaultWorkQueueBase('" << mName << "') - PROCESS_RESPONSE_END(" << dbgMsg.str();
693 
694 	}
695 
processIdleRequests()696 	bool DefaultWorkQueueBase::processIdleRequests()
697 	{
698 		{
699                     OGRE_LOCK_MUTEX(mIdleMutex);
700 			if(mIdleRequestQueue.empty() || mIdleThreadRunning){
701 				return false;
702 			} else {
703 				mIdleThreadRunning = true;
704 			}
705 		}
706 		try {
707 			while(1){
708 				{
709                                     OGRE_LOCK_MUTEX(mProcessMutex); // mProcessMutex needs to be the top mutex to prevent livelocks
710 					{
711                                             OGRE_LOCK_MUTEX(mIdleMutex);
712 						if(!mIdleRequestQueue.empty()){
713 							mIdleProcessed = mIdleRequestQueue.front();
714 							mIdleRequestQueue.pop_front();
715 						} else {
716 							mIdleProcessed = 0;
717 							mIdleThreadRunning = false;
718 							return true;
719 						}
720 					}
721 				}
722 				processRequestResponse(mIdleProcessed, false);
723 			}
724 		} catch (...) { // Normally this should not happen.
725 			{
726 				// It is very important to clean up or the idle thread will be locked forever!
727                             OGRE_LOCK_MUTEX(mProcessMutex);
728 				{
729                                     OGRE_LOCK_MUTEX(mIdleMutex);
730 					if(mIdleProcessed){
731 						mIdleProcessed->abortRequest();
732 					}
733 					mIdleProcessed = 0;
734 					mIdleThreadRunning = false;
735 				}
736 			}
737 			Ogre::LogManager::getSingleton().stream() << "Exception caught in top of worker thread!";
738 
739 			return true;
740 		}
741 	}
742 
743 
744 	//---------------------------------------------------------------------
745 
operator ()()746 	void DefaultWorkQueueBase::WorkerFunc::operator()()
747 	{
748 		mQueue->_threadMain();
749 	}
750 
operator ()() const751 	void DefaultWorkQueueBase::WorkerFunc::operator()() const
752 	{
753 		mQueue->_threadMain();
754 	}
755 
run()756 	void DefaultWorkQueueBase::WorkerFunc::run()
757 	{
758 		mQueue->_threadMain();
759 	}
760 }
761