1 /* 2 ----------------------------------------------------------------------------- 3 This source file is part of OGRE 4 (Object-oriented Graphics Rendering Engine) 5 For the latest info, see http://www.ogre3d.org/ 6 7 Copyright (c) 2000-2013 Torus Knot Software Ltd 8 9 Permission is hereby granted, free of charge, to any person obtaining a copy 10 of this software and associated documentation files (the "Software"), to deal 11 in the Software without restriction, including without limitation the rights 12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 13 copies of the Software, and to permit persons to whom the Software is 14 furnished to do so, subject to the following conditions: 15 16 The above copyright notice and this permission notice shall be included in 17 all copies or substantial portions of the Software. 18 19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 25 THE SOFTWARE. 26 ----------------------------------------------------------------------------- 27 */ 28 #include "OgreStableHeaders.h" 29 #include "OgreWorkQueue.h" 30 #include "OgreLogManager.h" 31 #include "OgreRoot.h" 32 #include "OgreRenderSystem.h" 33 34 namespace Ogre { 35 //--------------------------------------------------------------------- getChannel(const String & channelName)36 uint16 WorkQueue::getChannel(const String& channelName) 37 { 38 OGRE_LOCK_MUTEX(mChannelMapMutex); 39 40 ChannelMap::iterator i = mChannelMap.find(channelName); 41 if (i == mChannelMap.end()) 42 { 43 i = mChannelMap.insert(ChannelMap::value_type(channelName, mNextChannel++)).first; 44 } 45 return i->second; 46 } 47 //--------------------------------------------------------------------- Request(uint16 channel,uint16 rtype,const Any & rData,uint8 retry,RequestID rid)48 WorkQueue::Request::Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid) 49 : mChannel(channel), mType(rtype), mData(rData), mRetryCount(retry), mID(rid), mAborted(false) 50 { 51 52 } 53 //--------------------------------------------------------------------- ~Request()54 WorkQueue::Request::~Request() 55 { 56 57 } 58 //--------------------------------------------------------------------- 59 //--------------------------------------------------------------------- Response(const Request * rq,bool success,const Any & data,const String & msg)60 WorkQueue::Response::Response(const Request* rq, bool success, const Any& data, const String& msg) 61 : mRequest(rq), mSuccess(success), mMessages(msg), mData(data) 62 { 63 64 } 65 //--------------------------------------------------------------------- ~Response()66 WorkQueue::Response::~Response() 67 { 68 OGRE_DELETE mRequest; 69 } 70 //--------------------------------------------------------------------- 71 //--------------------------------------------------------------------- DefaultWorkQueueBase(const String & name)72 DefaultWorkQueueBase::DefaultWorkQueueBase(const String& name) 73 : mName(name) 74 , mWorkerThreadCount(1) 75 , mWorkerRenderSystemAccess(false) 76 , mIsRunning(false) 77 , mResposeTimeLimitMS(8) 78 , mWorkerFunc(0) 79 , mRequestCount(0) 80 , mPaused(false) 81 , mAcceptRequests(true) 82 , mShuttingDown(false) 83 , mIdleProcessed(0) 84 { 85 } 86 //--------------------------------------------------------------------- getName() const87 const String& DefaultWorkQueueBase::getName() const 88 { 89 return mName; 90 } 91 //--------------------------------------------------------------------- getWorkerThreadCount() const92 size_t DefaultWorkQueueBase::getWorkerThreadCount() const 93 { 94 return mWorkerThreadCount; 95 } 96 //--------------------------------------------------------------------- setWorkerThreadCount(size_t c)97 void DefaultWorkQueueBase::setWorkerThreadCount(size_t c) 98 { 99 mWorkerThreadCount = c; 100 } 101 //--------------------------------------------------------------------- getWorkersCanAccessRenderSystem() const102 bool DefaultWorkQueueBase::getWorkersCanAccessRenderSystem() const 103 { 104 return mWorkerRenderSystemAccess; 105 } 106 //--------------------------------------------------------------------- setWorkersCanAccessRenderSystem(bool access)107 void DefaultWorkQueueBase::setWorkersCanAccessRenderSystem(bool access) 108 { 109 mWorkerRenderSystemAccess = access; 110 } 111 //--------------------------------------------------------------------- ~DefaultWorkQueueBase()112 DefaultWorkQueueBase::~DefaultWorkQueueBase() 113 { 114 //shutdown(); // can't call here; abstract function 115 116 for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i) 117 { 118 OGRE_DELETE (*i); 119 } 120 mRequestQueue.clear(); 121 122 for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i) 123 { 124 OGRE_DELETE (*i); 125 } 126 mResponseQueue.clear(); 127 } 128 //--------------------------------------------------------------------- addRequestHandler(uint16 channel,RequestHandler * rh)129 void DefaultWorkQueueBase::addRequestHandler(uint16 channel, RequestHandler* rh) 130 { 131 OGRE_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex); 132 133 RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel); 134 if (i == mRequestHandlers.end()) 135 i = mRequestHandlers.insert(RequestHandlerListByChannel::value_type(channel, RequestHandlerList())).first; 136 137 RequestHandlerList& handlers = i->second; 138 bool duplicate = false; 139 for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j) 140 { 141 if ((*j)->getHandler() == rh) 142 { 143 duplicate = true; 144 break; 145 } 146 } 147 if (!duplicate) 148 handlers.push_back(RequestHandlerHolderPtr(OGRE_NEW RequestHandlerHolder(rh))); 149 150 } 151 //--------------------------------------------------------------------- removeRequestHandler(uint16 channel,RequestHandler * rh)152 void DefaultWorkQueueBase::removeRequestHandler(uint16 channel, RequestHandler* rh) 153 { 154 OGRE_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex); 155 156 RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel); 157 if (i != mRequestHandlers.end()) 158 { 159 RequestHandlerList& handlers = i->second; 160 for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j) 161 { 162 if ((*j)->getHandler() == rh) 163 { 164 // Disconnect - this will make it safe across copies of the list 165 // this is threadsafe and will wait for existing processes to finish 166 (*j)->disconnectHandler(); 167 handlers.erase(j); 168 break; 169 } 170 } 171 172 } 173 174 } 175 //--------------------------------------------------------------------- addResponseHandler(uint16 channel,ResponseHandler * rh)176 void DefaultWorkQueueBase::addResponseHandler(uint16 channel, ResponseHandler* rh) 177 { 178 ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel); 179 if (i == mResponseHandlers.end()) 180 i = mResponseHandlers.insert(ResponseHandlerListByChannel::value_type(channel, ResponseHandlerList())).first; 181 182 ResponseHandlerList& handlers = i->second; 183 if (std::find(handlers.begin(), handlers.end(), rh) == handlers.end()) 184 handlers.push_back(rh); 185 } 186 //--------------------------------------------------------------------- removeResponseHandler(uint16 channel,ResponseHandler * rh)187 void DefaultWorkQueueBase::removeResponseHandler(uint16 channel, ResponseHandler* rh) 188 { 189 ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel); 190 if (i != mResponseHandlers.end()) 191 { 192 ResponseHandlerList& handlers = i->second; 193 ResponseHandlerList::iterator j = std::find( 194 handlers.begin(), handlers.end(), rh); 195 if (j != handlers.end()) 196 handlers.erase(j); 197 198 } 199 } 200 //--------------------------------------------------------------------- addRequest(uint16 channel,uint16 requestType,const Any & rData,uint8 retryCount,bool forceSynchronous,bool idleThread)201 WorkQueue::RequestID DefaultWorkQueueBase::addRequest(uint16 channel, uint16 requestType, 202 const Any& rData, uint8 retryCount, bool forceSynchronous, bool idleThread) 203 { 204 Request* req = 0; 205 RequestID rid = 0; 206 207 { 208 // lock to acquire rid and push request to the queue 209 OGRE_LOCK_MUTEX(mRequestMutex); 210 211 if (!mAcceptRequests || mShuttingDown) 212 return 0; 213 214 rid = ++mRequestCount; 215 req = OGRE_NEW Request(channel, requestType, rData, retryCount, rid); 216 217 LogManager::getSingleton().stream(LML_TRIVIAL) << 218 "DefaultWorkQueueBase('" << mName << "') - QUEUED(thread:" << 219 #if OGRE_THREAD_SUPPORT 220 OGRE_THREAD_CURRENT_ID 221 #else 222 "main" 223 #endif 224 << "): ID=" << rid 225 << " channel=" << channel << " requestType=" << requestType; 226 #if OGRE_THREAD_SUPPORT 227 if (!forceSynchronous&& !idleThread) 228 { 229 mRequestQueue.push_back(req); 230 notifyWorkers(); 231 return rid; 232 } 233 #endif 234 } 235 if(idleThread){ 236 OGRE_LOCK_MUTEX(mIdleMutex); 237 mIdleRequestQueue.push_back(req); 238 if(!mIdleThreadRunning) 239 { 240 notifyWorkers(); 241 } 242 } else { //forceSynchronous 243 processRequestResponse(req, true); 244 } 245 return rid; 246 247 } 248 //--------------------------------------------------------------------- addRequestWithRID(WorkQueue::RequestID rid,uint16 channel,uint16 requestType,const Any & rData,uint8 retryCount)249 void DefaultWorkQueueBase::addRequestWithRID(WorkQueue::RequestID rid, uint16 channel, 250 uint16 requestType, const Any& rData, uint8 retryCount) 251 { 252 // lock to push request to the queue 253 OGRE_LOCK_MUTEX(mRequestMutex); 254 255 if (mShuttingDown) 256 return; 257 258 Request* req = OGRE_NEW Request(channel, requestType, rData, retryCount, rid); 259 260 LogManager::getSingleton().stream(LML_TRIVIAL) << 261 "DefaultWorkQueueBase('" << mName << "') - REQUEUED(thread:" << 262 #if OGRE_THREAD_SUPPORT 263 OGRE_THREAD_CURRENT_ID 264 #else 265 "main" 266 #endif 267 << "): ID=" << rid 268 << " channel=" << channel << " requestType=" << requestType; 269 #if OGRE_THREAD_SUPPORT 270 mRequestQueue.push_back(req); 271 notifyWorkers(); 272 #else 273 processRequestResponse(req, true); 274 #endif 275 } 276 //--------------------------------------------------------------------- abortRequest(RequestID id)277 void DefaultWorkQueueBase::abortRequest(RequestID id) 278 { 279 OGRE_LOCK_MUTEX(mProcessMutex); 280 281 // NOTE: Pending requests are exist any of RequestQueue, ProcessQueue and 282 // ResponseQueue when keeping ProcessMutex, so we check all of these queues. 283 284 for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i) 285 { 286 if ((*i)->getID() == id) 287 { 288 (*i)->abortRequest(); 289 break; 290 } 291 } 292 293 { 294 OGRE_LOCK_MUTEX(mRequestMutex); 295 296 for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i) 297 { 298 if ((*i)->getID() == id) 299 { 300 (*i)->abortRequest(); 301 break; 302 } 303 } 304 } 305 306 { 307 if(mIdleProcessed) 308 { 309 mIdleProcessed->abortRequest(); 310 } 311 312 OGRE_LOCK_MUTEX(mIdleMutex); 313 for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i) 314 { 315 (*i)->abortRequest(); 316 } 317 } 318 319 { 320 OGRE_LOCK_MUTEX(mResponseMutex); 321 322 for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i) 323 { 324 if( (*i)->getRequest()->getID() == id ) 325 { 326 (*i)->abortRequest(); 327 break; 328 } 329 } 330 } 331 } 332 //--------------------------------------------------------------------- abortRequestsByChannel(uint16 channel)333 void DefaultWorkQueueBase::abortRequestsByChannel(uint16 channel) 334 { 335 OGRE_LOCK_MUTEX(mProcessMutex); 336 337 for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i) 338 { 339 if ((*i)->getChannel() == channel) 340 { 341 (*i)->abortRequest(); 342 } 343 } 344 345 { 346 OGRE_LOCK_MUTEX(mRequestMutex); 347 348 for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i) 349 { 350 if ((*i)->getChannel() == channel) 351 { 352 (*i)->abortRequest(); 353 } 354 } 355 } 356 { 357 if (mIdleProcessed && mIdleProcessed->getChannel() == channel) 358 { 359 mIdleProcessed->abortRequest(); 360 } 361 362 OGRE_LOCK_MUTEX(mIdleMutex); 363 364 for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i) 365 { 366 if ((*i)->getChannel() == channel) 367 { 368 (*i)->abortRequest(); 369 } 370 } 371 } 372 373 { 374 OGRE_LOCK_MUTEX(mResponseMutex); 375 376 for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i) 377 { 378 if( (*i)->getRequest()->getChannel() == channel ) 379 { 380 (*i)->abortRequest(); 381 } 382 } 383 } 384 } 385 //--------------------------------------------------------------------- abortPendingRequestsByChannel(uint16 channel)386 void DefaultWorkQueueBase::abortPendingRequestsByChannel(uint16 channel) 387 { 388 { 389 OGRE_LOCK_MUTEX(mRequestMutex); 390 for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i) 391 { 392 if ((*i)->getChannel() == channel) 393 { 394 (*i)->abortRequest(); 395 } 396 } 397 } 398 { 399 OGRE_LOCK_MUTEX(mIdleMutex); 400 401 for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i) 402 { 403 if ((*i)->getChannel() == channel) 404 { 405 (*i)->abortRequest(); 406 } 407 } 408 } 409 } 410 //--------------------------------------------------------------------- abortAllRequests()411 void DefaultWorkQueueBase::abortAllRequests() 412 { 413 OGRE_LOCK_MUTEX(mProcessMutex); 414 { 415 for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i) 416 { 417 (*i)->abortRequest(); 418 } 419 } 420 421 422 { 423 OGRE_LOCK_MUTEX(mRequestMutex); 424 425 for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i) 426 { 427 (*i)->abortRequest(); 428 } 429 } 430 431 { 432 433 if(mIdleProcessed) 434 { 435 mIdleProcessed->abortRequest(); 436 } 437 438 OGRE_LOCK_MUTEX(mIdleMutex); 439 440 for (RequestQueue::iterator i = mIdleRequestQueue.begin(); i != mIdleRequestQueue.end(); ++i) 441 { 442 (*i)->abortRequest(); 443 } 444 } 445 446 { 447 OGRE_LOCK_MUTEX(mResponseMutex); 448 449 for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i) 450 { 451 (*i)->abortRequest(); 452 } 453 } 454 455 } 456 //--------------------------------------------------------------------- setPaused(bool pause)457 void DefaultWorkQueueBase::setPaused(bool pause) 458 { 459 OGRE_LOCK_MUTEX(mRequestMutex); 460 461 mPaused = pause; 462 } 463 //--------------------------------------------------------------------- isPaused() const464 bool DefaultWorkQueueBase::isPaused() const 465 { 466 return mPaused; 467 } 468 //--------------------------------------------------------------------- setRequestsAccepted(bool accept)469 void DefaultWorkQueueBase::setRequestsAccepted(bool accept) 470 { 471 OGRE_LOCK_MUTEX(mRequestMutex); 472 473 mAcceptRequests = accept; 474 } 475 //--------------------------------------------------------------------- getRequestsAccepted() const476 bool DefaultWorkQueueBase::getRequestsAccepted() const 477 { 478 return mAcceptRequests; 479 } 480 //--------------------------------------------------------------------- _processNextRequest()481 void DefaultWorkQueueBase::_processNextRequest() 482 { 483 if(processIdleRequests()){ 484 // Found idle requests. 485 return; 486 } 487 Request* request = 0; 488 { 489 // scoped to only lock while retrieving the next request 490 OGRE_LOCK_MUTEX(mProcessMutex); 491 { 492 OGRE_LOCK_MUTEX(mRequestMutex); 493 494 if (!mRequestQueue.empty()) 495 { 496 request = mRequestQueue.front(); 497 mRequestQueue.pop_front(); 498 mProcessQueue.push_back( request ); 499 } 500 } 501 } 502 503 if (request) 504 { 505 processRequestResponse(request, false); 506 } 507 508 509 } 510 //--------------------------------------------------------------------- processRequestResponse(Request * r,bool synchronous)511 void DefaultWorkQueueBase::processRequestResponse(Request* r, bool synchronous) 512 { 513 Response* response = processRequest(r); 514 515 OGRE_LOCK_MUTEX(mProcessMutex); 516 517 RequestQueue::iterator it; 518 for( it = mProcessQueue.begin(); it != mProcessQueue.end(); ++it ) 519 { 520 if( (*it) == r ) 521 { 522 mProcessQueue.erase( it ); 523 break; 524 } 525 } 526 if( mIdleProcessed == r ) 527 { 528 mIdleProcessed = 0; 529 } 530 if (response) 531 { 532 if (!response->succeeded()) 533 { 534 // Failed, should we retry? 535 const Request* req = response->getRequest(); 536 if (req->getRetryCount()) 537 { 538 addRequestWithRID(req->getID(), req->getChannel(), req->getType(), req->getData(), 539 req->getRetryCount() - 1); 540 // discard response (this also deletes request) 541 OGRE_DELETE response; 542 return; 543 } 544 } 545 if (synchronous) 546 { 547 processResponse(response); 548 OGRE_DELETE response; 549 } 550 else 551 { 552 if( response->getRequest()->getAborted() ) 553 { 554 // destroy response user data 555 response->abortRequest(); 556 } 557 // Queue response 558 OGRE_LOCK_MUTEX(mResponseMutex); 559 mResponseQueue.push_back(response); 560 // no need to wake thread, this is processed by the main thread 561 } 562 563 } 564 else 565 { 566 // no response, delete request 567 LogManager::getSingleton().stream() << 568 "DefaultWorkQueueBase('" << mName << "') warning: no handler processed request " 569 << r->getID() << ", channel " << r->getChannel() 570 << ", type " << r->getType(); 571 OGRE_DELETE r; 572 } 573 574 } 575 //--------------------------------------------------------------------- processResponses()576 void DefaultWorkQueueBase::processResponses() 577 { 578 unsigned long msStart = Root::getSingleton().getTimer()->getMilliseconds(); 579 unsigned long msCurrent = 0; 580 581 // keep going until we run out of responses or out of time 582 while(true) 583 { 584 Response* response = 0; 585 { 586 OGRE_LOCK_MUTEX(mResponseMutex); 587 588 if (mResponseQueue.empty()) 589 break; // exit loop 590 else 591 { 592 response = mResponseQueue.front(); 593 mResponseQueue.pop_front(); 594 } 595 } 596 597 if (response) 598 { 599 processResponse(response); 600 601 OGRE_DELETE response; 602 603 } 604 605 // time limit 606 if (mResposeTimeLimitMS) 607 { 608 msCurrent = Root::getSingleton().getTimer()->getMilliseconds(); 609 if (msCurrent - msStart > mResposeTimeLimitMS) 610 break; 611 } 612 } 613 } 614 //--------------------------------------------------------------------- processRequest(Request * r)615 WorkQueue::Response* DefaultWorkQueueBase::processRequest(Request* r) 616 { 617 RequestHandlerListByChannel handlerListCopy; 618 { 619 // lock the list only to make a copy of it, to maximise parallelism 620 OGRE_LOCK_RW_MUTEX_READ(mRequestHandlerMutex); 621 622 handlerListCopy = mRequestHandlers; 623 624 } 625 626 Response* response = 0; 627 628 StringUtil::StrStreamType dbgMsg; 629 dbgMsg << 630 #if OGRE_THREAD_SUPPORT 631 OGRE_THREAD_CURRENT_ID 632 #else 633 "main" 634 #endif 635 << "): ID=" << r->getID() << " channel=" << r->getChannel() 636 << " requestType=" << r->getType(); 637 638 LogManager::getSingleton().stream(LML_TRIVIAL) << 639 "DefaultWorkQueueBase('" << mName << "') - PROCESS_REQUEST_START(" << dbgMsg.str(); 640 641 RequestHandlerListByChannel::iterator i = handlerListCopy.find(r->getChannel()); 642 if (i != handlerListCopy.end()) 643 { 644 RequestHandlerList& handlers = i->second; 645 for (RequestHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j) 646 { 647 // threadsafe call which tests canHandleRequest and calls it if so 648 response = (*j)->handleRequest(r, this); 649 650 if (response) 651 break; 652 } 653 } 654 655 LogManager::getSingleton().stream(LML_TRIVIAL) << 656 "DefaultWorkQueueBase('" << mName << "') - PROCESS_REQUEST_END(" << dbgMsg.str() 657 << " processed=" << (response!=0); 658 659 return response; 660 661 } 662 //--------------------------------------------------------------------- processResponse(Response * r)663 void DefaultWorkQueueBase::processResponse(Response* r) 664 { 665 StringUtil::StrStreamType dbgMsg; 666 dbgMsg << "thread:" << 667 #if OGRE_THREAD_SUPPORT 668 OGRE_THREAD_CURRENT_ID 669 #else 670 "main" 671 #endif 672 << "): ID=" << r->getRequest()->getID() 673 << " success=" << r->succeeded() << " messages=[" << r->getMessages() << "] channel=" 674 << r->getRequest()->getChannel() << " requestType=" << r->getRequest()->getType(); 675 676 LogManager::getSingleton().stream(LML_TRIVIAL) << 677 "DefaultWorkQueueBase('" << mName << "') - PROCESS_RESPONSE_START(" << dbgMsg.str(); 678 679 ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(r->getRequest()->getChannel()); 680 if (i != mResponseHandlers.end()) 681 { 682 ResponseHandlerList& handlers = i->second; 683 for (ResponseHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j) 684 { 685 if ((*j)->canHandleResponse(r, this)) 686 { 687 (*j)->handleResponse(r, this); 688 } 689 } 690 } 691 LogManager::getSingleton().stream(LML_TRIVIAL) << 692 "DefaultWorkQueueBase('" << mName << "') - PROCESS_RESPONSE_END(" << dbgMsg.str(); 693 694 } 695 processIdleRequests()696 bool DefaultWorkQueueBase::processIdleRequests() 697 { 698 { 699 OGRE_LOCK_MUTEX(mIdleMutex); 700 if(mIdleRequestQueue.empty() || mIdleThreadRunning){ 701 return false; 702 } else { 703 mIdleThreadRunning = true; 704 } 705 } 706 try { 707 while(1){ 708 { 709 OGRE_LOCK_MUTEX(mProcessMutex); // mProcessMutex needs to be the top mutex to prevent livelocks 710 { 711 OGRE_LOCK_MUTEX(mIdleMutex); 712 if(!mIdleRequestQueue.empty()){ 713 mIdleProcessed = mIdleRequestQueue.front(); 714 mIdleRequestQueue.pop_front(); 715 } else { 716 mIdleProcessed = 0; 717 mIdleThreadRunning = false; 718 return true; 719 } 720 } 721 } 722 processRequestResponse(mIdleProcessed, false); 723 } 724 } catch (...) { // Normally this should not happen. 725 { 726 // It is very important to clean up or the idle thread will be locked forever! 727 OGRE_LOCK_MUTEX(mProcessMutex); 728 { 729 OGRE_LOCK_MUTEX(mIdleMutex); 730 if(mIdleProcessed){ 731 mIdleProcessed->abortRequest(); 732 } 733 mIdleProcessed = 0; 734 mIdleThreadRunning = false; 735 } 736 } 737 Ogre::LogManager::getSingleton().stream() << "Exception caught in top of worker thread!"; 738 739 return true; 740 } 741 } 742 743 744 //--------------------------------------------------------------------- 745 operator ()()746 void DefaultWorkQueueBase::WorkerFunc::operator()() 747 { 748 mQueue->_threadMain(); 749 } 750 operator ()() const751 void DefaultWorkQueueBase::WorkerFunc::operator()() const 752 { 753 mQueue->_threadMain(); 754 } 755 run()756 void DefaultWorkQueueBase::WorkerFunc::run() 757 { 758 mQueue->_threadMain(); 759 } 760 } 761