1 /********************************************************************** 2 * $Id: cpl_worker_thread_pool.h 1df63aa4d30bc389133c38822002d5ecfb91c314 2020-05-21 11:30:55 +0200 Even Rouault $ 3 * 4 * Project: CPL - Common Portability Library 5 * Purpose: CPL worker thread pool 6 * Author: Even Rouault, <even dot rouault at spatialys dot com> 7 * 8 ********************************************************************** 9 * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 10 * 11 * Permission is hereby granted, free of charge, to any person obtaining a 12 * copy of this software and associated documentation files (the "Software"), 13 * to deal in the Software without restriction, including without limitation 14 * the rights to use, copy, modify, merge, publish, distribute, sublicense, 15 * and/or sell copies of the Software, and to permit persons to whom the 16 * Software is furnished to do so, subject to the following conditions: 17 * 18 * The above copyright notice and this permission notice shall be included 19 * in all copies or substantial portions of the Software. 20 * 21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 26 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 27 * DEALINGS IN THE SOFTWARE. 28 ****************************************************************************/ 29 30 #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 31 #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 32 33 #include "cpl_multiproc.h" 34 #include "cpl_list.h" 35 36 #include <condition_variable> 37 #include <memory> 38 #include <mutex> 39 #include <vector> 40 41 /** 42 * \file cpl_worker_thread_pool.h 43 * 44 * Class to manage a pool of worker threads. 45 * @since GDAL 2.1 46 */ 47 48 #ifndef DOXYGEN_SKIP 49 struct CPLWorkerThreadJob; 50 class CPLWorkerThreadPool; 51 52 struct CPLWorkerThread 53 { 54 CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 55 CPLWorkerThread() = default; 56 57 CPLThreadFunc pfnInitFunc = nullptr; 58 void *pInitData = nullptr; 59 CPLWorkerThreadPool *poTP = nullptr; 60 CPLJoinableThread *hThread = nullptr; 61 bool bMarkedAsWaiting = false; 62 63 std::mutex m_mutex{}; 64 std::condition_variable m_cv{}; 65 }; 66 67 typedef enum 68 { 69 CPLWTS_OK, 70 CPLWTS_STOP, 71 CPLWTS_ERROR 72 } CPLWorkerThreadState; 73 #endif // ndef DOXYGEN_SKIP 74 75 class CPLJobQueue; 76 77 /** Pool of worker threads */ 78 class CPL_DLL CPLWorkerThreadPool 79 { CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool)80 CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 81 82 std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 83 std::mutex m_mutex{}; 84 std::condition_variable m_cv{}; 85 volatile CPLWorkerThreadState eState = CPLWTS_OK; 86 CPLList* psJobQueue = nullptr; 87 volatile int nPendingJobs = 0; 88 89 CPLList* psWaitingWorkerThreadsList = nullptr; 90 int nWaitingWorkerThreads = 0; 91 92 static void WorkerThreadFunction(void* user_data); 93 94 void DeclareJobFinished(); 95 CPLWorkerThreadJob* GetNextJob(CPLWorkerThread* psWorkerThread); 96 97 public: 98 CPLWorkerThreadPool(); 99 ~CPLWorkerThreadPool(); 100 101 102 bool Setup(int nThreads, 103 CPLThreadFunc pfnInitFunc, 104 void** pasInitData); 105 bool Setup(int nThreads, 106 CPLThreadFunc pfnInitFunc, 107 void** pasInitData, 108 bool bWaitallStarted); 109 110 std::unique_ptr<CPLJobQueue> CreateJobQueue(); 111 112 bool SubmitJob(CPLThreadFunc pfnFunc, void* pData); 113 bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void*>& apData); 114 void WaitCompletion(int nMaxRemainingJobs = 0); 115 void WaitEvent(); 116 117 /** Return the number of threads setup */ GetThreadCount()118 int GetThreadCount() const { return static_cast<int>(aWT.size()); } 119 }; 120 121 /** Job queue */ 122 class CPL_DLL CPLJobQueue 123 { 124 CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 125 CPLWorkerThreadPool* m_poPool = nullptr; 126 std::mutex m_mutex{}; 127 std::condition_variable m_cv{}; 128 int m_nPendingJobs = 0; 129 130 static void JobQueueFunction(void*); 131 void DeclareJobFinished(); 132 133 //! @cond Doxygen_Suppress 134 protected: 135 friend class CPLWorkerThreadPool; 136 explicit CPLJobQueue(CPLWorkerThreadPool* poPool); 137 //! @endcond 138 139 public: 140 ~CPLJobQueue(); 141 142 /** Return the owning worker thread pool */ GetPool()143 CPLWorkerThreadPool* GetPool() { return m_poPool; } 144 145 bool SubmitJob(CPLThreadFunc pfnFunc, void* pData); 146 void WaitCompletion(int nMaxRemainingJobs = 0); 147 }; 148 149 #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_ 150