1 /***************************************************************************** 2 * Copyright (C) 2013-2020 MulticoreWare, Inc 3 * 4 * Authors: Steve Borho <steve@borho.org> 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation; either version 2 of the License, or 9 * (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with this program; if not, write to the Free Software 18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA. 19 * 20 * This program is also available under a commercial proprietary license. 21 * For more information, contact us at license @ x265.com 22 *****************************************************************************/ 23 24 #ifndef X265_THREADPOOL_H 25 #define X265_THREADPOOL_H 26 27 #include "common.h" 28 #include "threading.h" 29 30 namespace X265_NS { 31 // x265 private namespace 32 33 class ThreadPool; 34 class WorkerThread; 35 class BondedTaskGroup; 36 37 #if X86_64 38 typedef uint64_t sleepbitmap_t; 39 #else 40 typedef uint32_t sleepbitmap_t; 41 #endif 42 43 static const sleepbitmap_t ALL_POOL_THREADS = (sleepbitmap_t)-1; 44 enum { MAX_POOL_THREADS = sizeof(sleepbitmap_t) * 8 }; 45 enum { INVALID_SLICE_PRIORITY = 10 }; // a value larger than any X265_TYPE_* macro 46 47 // Frame level job providers. FrameEncoder and Lookahead derive from 48 // this class and implement findJob() 49 class JobProvider 50 { 51 public: 52 53 ThreadPool* m_pool; 54 sleepbitmap_t m_ownerBitmap; 55 int m_jpId; 56 int m_sliceType; 57 bool m_helpWanted; 58 bool m_isFrameEncoder; /* rather ugly hack, but nothing better presents itself */ 59 JobProvider()60 JobProvider() 61 : m_pool(NULL) 62 , m_ownerBitmap(0) 63 , m_jpId(-1) 64 , m_sliceType(INVALID_SLICE_PRIORITY) 65 , m_helpWanted(false) 66 , m_isFrameEncoder(false) 67 {} 68 ~JobProvider()69 virtual ~JobProvider() {} 70 71 // Worker threads will call this method to perform work 72 virtual void findJob(int workerThreadId) = 0; 73 74 // Will awaken one idle thread, preferring a thread which most recently 75 // performed work for this provider. 76 void tryWakeOne(); 77 }; 78 79 class ThreadPool 80 { 81 public: 82 83 sleepbitmap_t m_sleepBitmap; 84 int m_numProviders; 85 int m_numWorkers; 86 void* m_numaMask; // node mask in linux, cpu mask in windows 87 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 88 GROUP_AFFINITY m_groupAffinity; 89 #endif 90 bool m_isActive; 91 92 JobProvider** m_jpTable; 93 WorkerThread* m_workers; 94 95 ThreadPool(); 96 ~ThreadPool(); 97 98 bool create(int numThreads, int maxProviders, uint64_t nodeMask); 99 bool start(); 100 void stopWorkers(); 101 void setCurrentThreadAffinity(); 102 void setThreadNodeAffinity(void *numaMask); 103 int tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap); 104 int tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master); 105 static ThreadPool* allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved); 106 static int getCpuCount(); 107 static int getNumaNodeCount(); 108 static void getFrameThreadsCount(x265_param* p,int cpuCount); 109 }; 110 111 /* Any worker thread may enlist the help of idle worker threads from the same 112 * job provider. They must derive from this class and implement the 113 * processTasks() method. To use, an instance must be instantiated by a worker 114 * thread (referred to as the master thread) and then tryBondPeers() must be 115 * called. If it returns non-zero then some number of slave worker threads are 116 * already in the process of calling your processTasks() function. The master 117 * thread should participate and call processTasks() itself. When 118 * waitForExit() returns, all bonded peer threads are guaranteed to have 119 * exitied processTasks(). Since the thread count is small, it uses explicit 120 * locking instead of atomic counters and bitmasks */ 121 class BondedTaskGroup 122 { 123 public: 124 125 Lock m_lock; 126 ThreadSafeInteger m_exitedPeerCount; 127 int m_bondedPeerCount; 128 int m_jobTotal; 129 int m_jobAcquired; 130 BondedTaskGroup()131 BondedTaskGroup() { m_bondedPeerCount = m_jobTotal = m_jobAcquired = 0; } 132 133 /* Do not allow the instance to be destroyed before all bonded peers have 134 * exited processTasks() */ ~BondedTaskGroup()135 ~BondedTaskGroup() { waitForExit(); } 136 137 /* Try to enlist the help of idle worker threads on most recently associated 138 * with the given job provider and "bond" them to work on your tasks. Up to 139 * maxPeers worker threads will call your processTasks() method. */ tryBondPeers(JobProvider & jp,int maxPeers)140 int tryBondPeers(JobProvider& jp, int maxPeers) 141 { 142 int count = jp.m_pool->tryBondPeers(maxPeers, jp.m_ownerBitmap, *this); 143 m_bondedPeerCount += count; 144 return count; 145 } 146 147 /* Try to enlist the help of any idle worker threads and "bond" them to work 148 * on your tasks. Up to maxPeers worker threads will call your 149 * processTasks() method. */ tryBondPeers(ThreadPool & pool,int maxPeers)150 int tryBondPeers(ThreadPool& pool, int maxPeers) 151 { 152 int count = pool.tryBondPeers(maxPeers, ALL_POOL_THREADS, *this); 153 m_bondedPeerCount += count; 154 return count; 155 } 156 157 /* Returns when all bonded peers have exited processTasks(). It does *NOT* 158 * ensure all tasks are completed (but this is generally implied). */ waitForExit()159 void waitForExit() 160 { 161 int exited = m_exitedPeerCount.get(); 162 while (m_bondedPeerCount != exited) 163 exited = m_exitedPeerCount.waitForChange(exited); 164 } 165 166 /* Derived classes must define this method. The worker thread ID may be 167 * used to index into thread local data, or ignored. The ID will be between 168 * 0 and jp.m_numWorkers - 1 */ 169 virtual void processTasks(int workerThreadId) = 0; 170 }; 171 172 } // end namespace X265_NS 173 174 #endif // ifndef X265_THREADPOOL_H 175