1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  * This file incorporates work covered by the following license notice:
10  *
11  *   Licensed to the Apache Software Foundation (ASF) under one or more
12  *   contributor license agreements. See the NOTICE file distributed
13  *   with this work for additional information regarding copyright
14  *   ownership. The ASF licenses this file to you under the Apache
15  *   License, Version 2.0 (the "License"); you may not use this file
16  *   except in compliance with the License. You may obtain a copy of
17  *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
18  */
19 
20 #include <sal/config.h>
21 
22 #include <algorithm>
23 #include <cstdlib>
24 #include <osl/diagnose.h>
25 #include <uno/threadpool.h>
26 #include <sal/log.hxx>
27 
28 #include "thread.hxx"
29 #include "jobqueue.hxx"
30 #include "threadpool.hxx"
31 
32 using namespace osl;
33 using namespace rtl;
34 
35 namespace cppu_threadpool {
36 
37 
ThreadAdmin()38     ThreadAdmin::ThreadAdmin(): m_disposed(false) {}
39 
~ThreadAdmin()40     ThreadAdmin::~ThreadAdmin()
41     {
42         SAL_WARN_IF(m_deque.size(), "cppu.threadpool", m_deque.size() << "Threads left");
43     }
44 
add_locked(rtl::Reference<ORequestThread> const & p)45     bool ThreadAdmin::add_locked( rtl::Reference< ORequestThread > const & p )
46     {
47         if( m_disposed )
48         {
49             return false;
50         }
51         m_deque.push_back( p );
52         return true;
53     }
54 
remove_locked(rtl::Reference<ORequestThread> const & p)55     void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p )
56     {
57         m_deque.erase(std::find( m_deque.begin(), m_deque.end(), p ), m_deque.end());
58     }
59 
remove(rtl::Reference<ORequestThread> const & p)60     void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p )
61     {
62         std::scoped_lock aGuard( m_mutex );
63         remove_locked( p );
64     }
65 
join()66     void ThreadAdmin::join()
67     {
68         {
69             std::scoped_lock aGuard( m_mutex );
70             m_disposed = true;
71         }
72         for (;;)
73         {
74             rtl::Reference< ORequestThread > pCurrent;
75             {
76                 std::scoped_lock aGuard( m_mutex );
77                 if( m_deque.empty() )
78                 {
79                     break;
80                 }
81                 pCurrent = m_deque.front();
82                 m_deque.pop_front();
83             }
84             if (pCurrent->getIdentifier()
85                 != osl::Thread::getCurrentIdentifier())
86             {
87                 pCurrent->join();
88             }
89         }
90     }
91 
92 
ORequestThread(ThreadPoolHolder const & aThreadPool,JobQueue * pQueue,const ByteSequence & aThreadId,bool bAsynchron)93     ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool,
94                                     JobQueue *pQueue,
95                                     const ByteSequence &aThreadId,
96                                     bool bAsynchron )
97         : m_aThreadPool( aThreadPool )
98         , m_pQueue( pQueue )
99         , m_aThreadId( aThreadId )
100         , m_bAsynchron( bAsynchron )
101     {}
102 
~ORequestThread()103     ORequestThread::~ORequestThread() {}
104 
setTask(JobQueue * pQueue,const ByteSequence & aThreadId,bool bAsynchron)105     void ORequestThread::setTask( JobQueue *pQueue,
106                                   const ByteSequence &aThreadId,
107                                   bool bAsynchron )
108     {
109         m_pQueue = pQueue;
110         m_aThreadId = aThreadId;
111         m_bAsynchron = bAsynchron;
112     }
113 
launch()114     bool ORequestThread::launch()
115     {
116         // Assumption is that osl::Thread::create returns normally with a true
117         // return value iff it causes osl::Thread::run to start executing:
118         acquire();
119         ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin();
120         std::unique_lock g(rThreadAdmin.m_mutex);
121         if (!rThreadAdmin.add_locked( this )) {
122             return false;
123         }
124         try {
125             if (!create()) {
126                 std::abort();
127             }
128         } catch (...) {
129             rThreadAdmin.remove_locked( this );
130             g.release();
131             release();
132             throw;
133         }
134         return true;
135     }
136 
onTerminated()137     void ORequestThread::onTerminated()
138     {
139         m_aThreadPool->getThreadAdmin().remove( this );
140         release();
141     }
142 
run()143     void ORequestThread::run()
144     {
145         osl_setThreadName("cppu_threadpool::ORequestThread");
146 
147         try
148         {
149             while ( m_pQueue )
150             {
151                 if( ! m_bAsynchron )
152                 {
153                     if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
154                     {
155                         OSL_ASSERT( false );
156                     }
157                 }
158 
159                 while( ! m_pQueue->isEmpty() )
160                 {
161                     // Note : Oneways should not get a disposable disposeid,
162                     //        It does not make sense to dispose a call in this state.
163                     //        That's way we put it a disposeid, that can't be used otherwise.
164                     m_pQueue->enter(
165                         this,
166                         true );
167 
168                     if( m_pQueue->isEmpty() )
169                     {
170                         m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
171                         // Note : revokeQueue might have failed because m_pQueue.isEmpty()
172                         //        may be false (race).
173                     }
174                 }
175 
176                 delete m_pQueue;
177                 m_pQueue = nullptr;
178 
179                 if( ! m_bAsynchron )
180                 {
181                     uno_releaseIdFromCurrentThread();
182                 }
183 
184                 m_aThreadPool->waitInPool( this );
185             }
186         }
187         catch (...)
188         {
189             // Work around the problem that onTerminated is not called if run
190             // throws an exception:
191             onTerminated();
192             throw;
193         }
194     }
195 }
196 
197 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
198