1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 2 /* 3 * This file is part of the LibreOffice project. 4 * 5 * This Source Code Form is subject to the terms of the Mozilla Public 6 * License, v. 2.0. If a copy of the MPL was not distributed with this 7 * file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 * 9 * This file incorporates work covered by the following license notice: 10 * 11 * Licensed to the Apache Software Foundation (ASF) under one or more 12 * contributor license agreements. See the NOTICE file distributed 13 * with this work for additional information regarding copyright 14 * ownership. The ASF licenses this file to you under the Apache 15 * License, Version 2.0 (the "License"); you may not use this file 16 * except in compliance with the License. You may obtain a copy of 17 * the License at http://www.apache.org/licenses/LICENSE-2.0 . 18 */ 19 20 #include <sal/config.h> 21 22 #include <algorithm> 23 #include <cstdlib> 24 #include <osl/diagnose.h> 25 #include <uno/threadpool.h> 26 #include <sal/log.hxx> 27 28 #include "thread.hxx" 29 #include "jobqueue.hxx" 30 #include "threadpool.hxx" 31 32 using namespace osl; 33 using namespace rtl; 34 35 namespace cppu_threadpool { 36 37 ThreadAdmin()38 ThreadAdmin::ThreadAdmin(): m_disposed(false) {} 39 ~ThreadAdmin()40 ThreadAdmin::~ThreadAdmin() 41 { 42 SAL_WARN_IF(m_deque.size(), "cppu.threadpool", m_deque.size() << "Threads left"); 43 } 44 add_locked(rtl::Reference<ORequestThread> const & p)45 bool ThreadAdmin::add_locked( rtl::Reference< ORequestThread > const & p ) 46 { 47 if( m_disposed ) 48 { 49 return false; 50 } 51 m_deque.push_back( p ); 52 return true; 53 } 54 remove_locked(rtl::Reference<ORequestThread> const & p)55 void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) 56 { 57 m_deque.erase(std::find( m_deque.begin(), m_deque.end(), p ), m_deque.end()); 58 } 59 remove(rtl::Reference<ORequestThread> const & p)60 void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) 61 { 62 std::scoped_lock aGuard( m_mutex ); 63 remove_locked( p ); 64 } 65 join()66 void ThreadAdmin::join() 67 { 68 { 69 std::scoped_lock aGuard( m_mutex ); 70 m_disposed = true; 71 } 72 for (;;) 73 { 74 rtl::Reference< ORequestThread > pCurrent; 75 { 76 std::scoped_lock aGuard( m_mutex ); 77 if( m_deque.empty() ) 78 { 79 break; 80 } 81 pCurrent = m_deque.front(); 82 m_deque.pop_front(); 83 } 84 if (pCurrent->getIdentifier() 85 != osl::Thread::getCurrentIdentifier()) 86 { 87 pCurrent->join(); 88 } 89 } 90 } 91 92 ORequestThread(ThreadPoolHolder const & aThreadPool,JobQueue * pQueue,const ByteSequence & aThreadId,bool bAsynchron)93 ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool, 94 JobQueue *pQueue, 95 const ByteSequence &aThreadId, 96 bool bAsynchron ) 97 : m_aThreadPool( aThreadPool ) 98 , m_pQueue( pQueue ) 99 , m_aThreadId( aThreadId ) 100 , m_bAsynchron( bAsynchron ) 101 {} 102 ~ORequestThread()103 ORequestThread::~ORequestThread() {} 104 setTask(JobQueue * pQueue,const ByteSequence & aThreadId,bool bAsynchron)105 void ORequestThread::setTask( JobQueue *pQueue, 106 const ByteSequence &aThreadId, 107 bool bAsynchron ) 108 { 109 m_pQueue = pQueue; 110 m_aThreadId = aThreadId; 111 m_bAsynchron = bAsynchron; 112 } 113 launch()114 bool ORequestThread::launch() 115 { 116 // Assumption is that osl::Thread::create returns normally with a true 117 // return value iff it causes osl::Thread::run to start executing: 118 acquire(); 119 ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); 120 std::unique_lock g(rThreadAdmin.m_mutex); 121 if (!rThreadAdmin.add_locked( this )) { 122 return false; 123 } 124 try { 125 if (!create()) { 126 std::abort(); 127 } 128 } catch (...) { 129 rThreadAdmin.remove_locked( this ); 130 g.release(); 131 release(); 132 throw; 133 } 134 return true; 135 } 136 onTerminated()137 void ORequestThread::onTerminated() 138 { 139 m_aThreadPool->getThreadAdmin().remove( this ); 140 release(); 141 } 142 run()143 void ORequestThread::run() 144 { 145 osl_setThreadName("cppu_threadpool::ORequestThread"); 146 147 try 148 { 149 while ( m_pQueue ) 150 { 151 if( ! m_bAsynchron ) 152 { 153 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) 154 { 155 OSL_ASSERT( false ); 156 } 157 } 158 159 while( ! m_pQueue->isEmpty() ) 160 { 161 // Note : Oneways should not get a disposable disposeid, 162 // It does not make sense to dispose a call in this state. 163 // That's way we put it a disposeid, that can't be used otherwise. 164 m_pQueue->enter( 165 this, 166 true ); 167 168 if( m_pQueue->isEmpty() ) 169 { 170 m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); 171 // Note : revokeQueue might have failed because m_pQueue.isEmpty() 172 // may be false (race). 173 } 174 } 175 176 delete m_pQueue; 177 m_pQueue = nullptr; 178 179 if( ! m_bAsynchron ) 180 { 181 uno_releaseIdFromCurrentThread(); 182 } 183 184 m_aThreadPool->waitInPool( this ); 185 } 186 } 187 catch (...) 188 { 189 // Work around the problem that onTerminated is not called if run 190 // throws an exception: 191 onTerminated(); 192 throw; 193 } 194 } 195 } 196 197 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ 198