1 /*****************************************************************************
2  * Copyright (C) 2013-2020 MulticoreWare, Inc
3  *
4  * Authors: Steve Borho <steve@borho.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
19  *
20  * This program is also available under a commercial proprietary license.
21  * For more information, contact us at license @ x265.com
22  *****************************************************************************/
23 
24 #ifndef X265_THREADPOOL_H
25 #define X265_THREADPOOL_H
26 
27 #include "common.h"
28 #include "threading.h"
29 
30 namespace X265_NS {
31 // x265 private namespace
32 
33 class ThreadPool;
34 class WorkerThread;
35 class BondedTaskGroup;
36 
37 #if X86_64
38 typedef uint64_t sleepbitmap_t;
39 #else
40 typedef uint32_t sleepbitmap_t;
41 #endif
42 
43 static const sleepbitmap_t ALL_POOL_THREADS = (sleepbitmap_t)-1;
44 enum { MAX_POOL_THREADS = sizeof(sleepbitmap_t) * 8 };
45 enum { INVALID_SLICE_PRIORITY = 10 }; // a value larger than any X265_TYPE_* macro
46 
47 // Frame level job providers. FrameEncoder and Lookahead derive from
48 // this class and implement findJob()
49 class JobProvider
50 {
51 public:
52 
53     ThreadPool*   m_pool;
54     sleepbitmap_t m_ownerBitmap;
55     int           m_jpId;
56     int           m_sliceType;
57     bool          m_helpWanted;
58     bool          m_isFrameEncoder; /* rather ugly hack, but nothing better presents itself */
59 
JobProvider()60     JobProvider()
61         : m_pool(NULL)
62         , m_ownerBitmap(0)
63         , m_jpId(-1)
64         , m_sliceType(INVALID_SLICE_PRIORITY)
65         , m_helpWanted(false)
66         , m_isFrameEncoder(false)
67     {}
68 
~JobProvider()69     virtual ~JobProvider() {}
70 
71     // Worker threads will call this method to perform work
72     virtual void findJob(int workerThreadId) = 0;
73 
74     // Will awaken one idle thread, preferring a thread which most recently
75     // performed work for this provider.
76     void tryWakeOne();
77 };
78 
79 class ThreadPool
80 {
81 public:
82 
83     sleepbitmap_t m_sleepBitmap;
84     int           m_numProviders;
85     int           m_numWorkers;
86     void*         m_numaMask; // node mask in linux, cpu mask in windows
87 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
88     GROUP_AFFINITY m_groupAffinity;
89 #endif
90     bool          m_isActive;
91 
92     JobProvider** m_jpTable;
93     WorkerThread* m_workers;
94 
95     ThreadPool();
96     ~ThreadPool();
97 
98     bool create(int numThreads, int maxProviders, uint64_t nodeMask);
99     bool start();
100     void stopWorkers();
101     void setCurrentThreadAffinity();
102     void setThreadNodeAffinity(void *numaMask);
103     int  tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap);
104     int  tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master);
105     static ThreadPool* allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved);
106     static int  getCpuCount();
107     static int  getNumaNodeCount();
108     static void getFrameThreadsCount(x265_param* p,int cpuCount);
109 };
110 
111 /* Any worker thread may enlist the help of idle worker threads from the same
112  * job provider. They must derive from this class and implement the
113  * processTasks() method.  To use, an instance must be instantiated by a worker
114  * thread (referred to as the master thread) and then tryBondPeers() must be
115  * called. If it returns non-zero then some number of slave worker threads are
116  * already in the process of calling your processTasks() function. The master
117  * thread should participate and call processTasks() itself. When
118  * waitForExit() returns, all bonded peer threads are guaranteed to have
119  * exitied processTasks(). Since the thread count is small, it uses explicit
120  * locking instead of atomic counters and bitmasks */
121 class BondedTaskGroup
122 {
123 public:
124 
125     Lock              m_lock;
126     ThreadSafeInteger m_exitedPeerCount;
127     int               m_bondedPeerCount;
128     int               m_jobTotal;
129     int               m_jobAcquired;
130 
BondedTaskGroup()131     BondedTaskGroup()  { m_bondedPeerCount = m_jobTotal = m_jobAcquired = 0; }
132 
133     /* Do not allow the instance to be destroyed before all bonded peers have
134      * exited processTasks() */
~BondedTaskGroup()135     ~BondedTaskGroup() { waitForExit(); }
136 
137     /* Try to enlist the help of idle worker threads on most recently associated
138      * with the given job provider and "bond" them to work on your tasks. Up to
139      * maxPeers worker threads will call your processTasks() method. */
tryBondPeers(JobProvider & jp,int maxPeers)140     int tryBondPeers(JobProvider& jp, int maxPeers)
141     {
142         int count = jp.m_pool->tryBondPeers(maxPeers, jp.m_ownerBitmap, *this);
143         m_bondedPeerCount += count;
144         return count;
145     }
146 
147     /* Try to enlist the help of any idle worker threads and "bond" them to work
148      * on your tasks. Up to maxPeers worker threads will call your
149      * processTasks() method. */
tryBondPeers(ThreadPool & pool,int maxPeers)150     int tryBondPeers(ThreadPool& pool, int maxPeers)
151     {
152         int count = pool.tryBondPeers(maxPeers, ALL_POOL_THREADS, *this);
153         m_bondedPeerCount += count;
154         return count;
155     }
156 
157     /* Returns when all bonded peers have exited processTasks(). It does *NOT*
158      * ensure all tasks are completed (but this is generally implied). */
waitForExit()159     void waitForExit()
160     {
161         int exited = m_exitedPeerCount.get();
162         while (m_bondedPeerCount != exited)
163             exited = m_exitedPeerCount.waitForChange(exited);
164     }
165 
166     /* Derived classes must define this method. The worker thread ID may be
167      * used to index into thread local data, or ignored.  The ID will be between
168      * 0 and jp.m_numWorkers - 1 */
169     virtual void processTasks(int workerThreadId) = 0;
170 };
171 
172 } // end namespace X265_NS
173 
174 #endif // ifndef X265_THREADPOOL_H
175