1 /**********************************************************************
2  * $Id: cpl_worker_thread_pool.h 1df63aa4d30bc389133c38822002d5ecfb91c314 2020-05-21 11:30:55 +0200 Even Rouault $
3  *
4  * Project:  CPL - Common Portability Library
5  * Purpose:  CPL worker thread pool
6  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
7  *
8  **********************************************************************
9  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a
12  * copy of this software and associated documentation files (the "Software"),
13  * to deal in the Software without restriction, including without limitation
14  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
15  * and/or sell copies of the Software, and to permit persons to whom the
16  * Software is furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included
19  * in all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
24  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
27  * DEALINGS IN THE SOFTWARE.
28  ****************************************************************************/
29 
30 #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
31 #define CPL_WORKER_THREAD_POOL_H_INCLUDED_
32 
33 #include "cpl_multiproc.h"
34 #include "cpl_list.h"
35 
36 #include <condition_variable>
37 #include <memory>
38 #include <mutex>
39 #include <vector>
40 
41 /**
42  * \file cpl_worker_thread_pool.h
43  *
44  * Class to manage a pool of worker threads.
45  * @since GDAL 2.1
46  */
47 
48 #ifndef DOXYGEN_SKIP
49 struct CPLWorkerThreadJob;
50 class CPLWorkerThreadPool;
51 
52 struct CPLWorkerThread
53 {
54     CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
55     CPLWorkerThread() = default;
56 
57     CPLThreadFunc        pfnInitFunc = nullptr;
58     void                *pInitData = nullptr;
59     CPLWorkerThreadPool *poTP = nullptr;
60     CPLJoinableThread   *hThread = nullptr;
61     bool                 bMarkedAsWaiting = false;
62 
63     std::mutex              m_mutex{};
64     std::condition_variable m_cv{};
65 };
66 
67 typedef enum
68 {
69     CPLWTS_OK,
70     CPLWTS_STOP,
71     CPLWTS_ERROR
72 } CPLWorkerThreadState;
73 #endif  // ndef DOXYGEN_SKIP
74 
75 class CPLJobQueue;
76 
77 /** Pool of worker threads */
78 class CPL_DLL CPLWorkerThreadPool
79 {
CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool)80         CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool)
81 
82         std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
83         std::mutex              m_mutex{};
84         std::condition_variable m_cv{};
85         volatile CPLWorkerThreadState eState = CPLWTS_OK;
86         CPLList* psJobQueue = nullptr;
87         volatile int nPendingJobs = 0;
88 
89         CPLList* psWaitingWorkerThreadsList = nullptr;
90         int nWaitingWorkerThreads = 0;
91 
92         static void WorkerThreadFunction(void* user_data);
93 
94         void DeclareJobFinished();
95         CPLWorkerThreadJob* GetNextJob(CPLWorkerThread* psWorkerThread);
96 
97     public:
98         CPLWorkerThreadPool();
99        ~CPLWorkerThreadPool();
100 
101 
102         bool Setup(int nThreads,
103                    CPLThreadFunc pfnInitFunc,
104                    void** pasInitData);
105         bool Setup(int nThreads,
106                    CPLThreadFunc pfnInitFunc,
107                    void** pasInitData,
108                    bool bWaitallStarted);
109 
110         std::unique_ptr<CPLJobQueue> CreateJobQueue();
111 
112         bool SubmitJob(CPLThreadFunc pfnFunc, void* pData);
113         bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void*>& apData);
114         void WaitCompletion(int nMaxRemainingJobs = 0);
115         void WaitEvent();
116 
117         /** Return the number of threads setup */
GetThreadCount()118         int GetThreadCount() const { return static_cast<int>(aWT.size()); }
119 };
120 
121 /** Job queue */
122 class CPL_DLL CPLJobQueue
123 {
124         CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue)
125         CPLWorkerThreadPool* m_poPool = nullptr;
126         std::mutex m_mutex{};
127         std::condition_variable m_cv{};
128         int m_nPendingJobs = 0;
129 
130         static void JobQueueFunction(void*);
131         void DeclareJobFinished();
132 
133 //! @cond Doxygen_Suppress
134 protected:
135         friend class CPLWorkerThreadPool;
136         explicit CPLJobQueue(CPLWorkerThreadPool* poPool);
137 //! @endcond
138 
139 public:
140         ~CPLJobQueue();
141 
142         /** Return the owning worker thread pool */
GetPool()143         CPLWorkerThreadPool* GetPool() { return m_poPool; }
144 
145         bool SubmitJob(CPLThreadFunc pfnFunc, void* pData);
146         void WaitCompletion(int nMaxRemainingJobs = 0);
147 };
148 
149 #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_
150