1 /*****************************************************************************
2  * Copyright (C) 2013 x265 project
3  *
4  * Authors: Steve Borho <steve@borho.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
19  *
20  * This program is also available under a commercial proprietary license.
21  * For more information, contact us at license @ x265.com
22  *****************************************************************************/
23 
24 #include "common.h"
25 #include "threadpool.h"
26 #include "threading.h"
27 
28 #include <new>
29 
30 #if X86_64
31 
32 #ifdef __GNUC__
33 
34 #define SLEEPBITMAP_CTZ(id, x)     id = (unsigned long)__builtin_ctzll(x)
35 #define SLEEPBITMAP_OR(ptr, mask)  __sync_fetch_and_or(ptr, mask)
36 #define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
37 
38 #elif defined(_MSC_VER)
39 
40 #define SLEEPBITMAP_CTZ(id, x)     _BitScanForward64(&id, x)
41 #define SLEEPBITMAP_OR(ptr, mask)  InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
42 #define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
43 
44 #endif // ifdef __GNUC__
45 
46 #else
47 
48 /* use 32-bit primitives defined in threading.h */
49 #define SLEEPBITMAP_CTZ CTZ
50 #define SLEEPBITMAP_OR  ATOMIC_OR
51 #define SLEEPBITMAP_AND ATOMIC_AND
52 
53 #endif
54 
55 #if MACOS
56 #include <sys/param.h>
57 #include <sys/sysctl.h>
58 #endif
59 #if HAVE_LIBNUMA
60 #include <numa.h>
61 #endif
62 
63 namespace X265_NS {
64 // x265 private namespace
65 
66 class WorkerThread : public Thread
67 {
68 private:
69 
70     ThreadPool&  m_pool;
71     int          m_id;
72     Event        m_wakeEvent;
73 
74     WorkerThread& operator =(const WorkerThread&);
75 
76 public:
77 
78     JobProvider*     m_curJobProvider;
79     BondedTaskGroup* m_bondMaster;
80 
WorkerThread(ThreadPool & pool,int id)81     WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
~WorkerThread()82     virtual ~WorkerThread() {}
83 
84     void threadMain();
awaken()85     void awaken()           { m_wakeEvent.trigger(); }
86 };
87 
threadMain()88 void WorkerThread::threadMain()
89 {
90     THREAD_NAME("Worker", m_id);
91 
92 #if _WIN32
93     SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
94 #else
95     __attribute__((unused)) int val = nice(10);
96 #endif
97 
98     m_pool.setCurrentThreadAffinity();
99 
100     sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
101     m_curJobProvider = m_pool.m_jpTable[0];
102     m_bondMaster = NULL;
103 
104     SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
105     SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
106     m_wakeEvent.wait();
107 
108     while (m_pool.m_isActive)
109     {
110         if (m_bondMaster)
111         {
112             m_bondMaster->processTasks(m_id);
113             m_bondMaster->m_exitedPeerCount.incr();
114             m_bondMaster = NULL;
115         }
116 
117         do
118         {
119             /* do pending work for current job provider */
120             m_curJobProvider->findJob(m_id);
121 
122             /* if the current job provider still wants help, only switch to a
123              * higher priority provider (lower slice type). Else take the first
124              * available job provider with the highest priority */
125             int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
126                                                                  INVALID_SLICE_PRIORITY + 1;
127             int nextProvider = -1;
128             for (int i = 0; i < m_pool.m_numProviders; i++)
129             {
130                 if (m_pool.m_jpTable[i]->m_helpWanted &&
131                     m_pool.m_jpTable[i]->m_sliceType < curPriority)
132                 {
133                     nextProvider = i;
134                     curPriority = m_pool.m_jpTable[i]->m_sliceType;
135                 }
136             }
137             if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
138             {
139                 SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
140                 m_curJobProvider = m_pool.m_jpTable[nextProvider];
141                 SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
142             }
143         }
144         while (m_curJobProvider->m_helpWanted);
145 
146         /* While the worker sleeps, a job-provider or bond-group may acquire this
147          * worker's sleep bitmap bit. Once acquired, that thread may modify
148          * m_bondMaster or m_curJobProvider, then waken the thread */
149         SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
150         m_wakeEvent.wait();
151     }
152 
153     SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
154 }
155 
tryWakeOne()156 void JobProvider::tryWakeOne()
157 {
158     int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
159     if (id < 0)
160     {
161         m_helpWanted = true;
162         return;
163     }
164 
165     WorkerThread& worker = m_pool->m_workers[id];
166     if (worker.m_curJobProvider != this) /* poaching */
167     {
168         sleepbitmap_t bit = (sleepbitmap_t)1 << id;
169         SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
170         worker.m_curJobProvider = this;
171         SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
172     }
173     worker.awaken();
174 }
175 
tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap,sleepbitmap_t secondTryBitmap)176 int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
177 {
178     unsigned long id;
179 
180     sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
181     while (masked)
182     {
183         SLEEPBITMAP_CTZ(id, masked);
184 
185         sleepbitmap_t bit = (sleepbitmap_t)1 << id;
186         if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
187             return (int)id;
188 
189         masked = m_sleepBitmap & firstTryBitmap;
190     }
191 
192     masked = m_sleepBitmap & secondTryBitmap;
193     while (masked)
194     {
195         SLEEPBITMAP_CTZ(id, masked);
196 
197         sleepbitmap_t bit = (sleepbitmap_t)1 << id;
198         if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
199             return (int)id;
200 
201         masked = m_sleepBitmap & secondTryBitmap;
202     }
203 
204     return -1;
205 }
206 
tryBondPeers(int maxPeers,sleepbitmap_t peerBitmap,BondedTaskGroup & master)207 int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
208 {
209     int bondCount = 0;
210     do
211     {
212         int id = tryAcquireSleepingThread(peerBitmap, 0);
213         if (id < 0)
214             return bondCount;
215 
216         m_workers[id].m_bondMaster = &master;
217         m_workers[id].awaken();
218         bondCount++;
219     }
220     while (bondCount < maxPeers);
221 
222     return bondCount;
223 }
224 
allocThreadPools(x265_param * p,int & numPools)225 ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools)
226 {
227     enum { MAX_NODE_NUM = 127 };
228     int cpusPerNode[MAX_NODE_NUM + 1];
229     int threadsPerPool[MAX_NODE_NUM + 2];
230     uint32_t nodeMaskPerPool[MAX_NODE_NUM +2];
231 
232     memset(cpusPerNode, 0, sizeof(cpusPerNode));
233     memset(threadsPerPool, 0, sizeof(threadsPerPool));
234     memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
235 
236     int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
237     int cpuCount = getCpuCount();
238     bool bNumaSupport = false;
239 
240 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
241     bNumaSupport = true;
242 #elif HAVE_LIBNUMA
243     bNumaSupport = numa_available() >= 0;
244 #endif
245 
246 
247     for (int i = 0; i < cpuCount; i++)
248     {
249 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
250         UCHAR node;
251         if (GetNumaProcessorNode((UCHAR)i, &node))
252             cpusPerNode[X265_MIN(node, (UCHAR)MAX_NODE_NUM)]++;
253         else
254 #elif HAVE_LIBNUMA
255         if (bNumaSupport >= 0)
256             cpusPerNode[X265_MIN(numa_node_of_cpu(i), MAX_NODE_NUM)]++;
257         else
258 #endif
259             cpusPerNode[0]++;
260     }
261 
262     if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
263         for (int i = 0; i < numNumaNodes; i++)
264             x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
265 
266     /* limit threads based on param->numaPools */
267     if (p->numaPools && *p->numaPools)
268     {
269         const char *nodeStr = p->numaPools;
270         for (int i = 0; i < numNumaNodes; i++)
271         {
272             if (!*nodeStr)
273             {
274                 threadsPerPool[i] = 0;
275                 continue;
276             }
277             else if (*nodeStr == '-')
278                 threadsPerPool[i] = 0;
279             else if (*nodeStr == '*')
280             {
281                 for (int j = i; j < numNumaNodes; j++)
282                 {
283                     threadsPerPool[numNumaNodes] += cpusPerNode[j];
284                     nodeMaskPerPool[numNumaNodes] |= (1U << j);
285                 }
286                 break;
287             }
288             else if (*nodeStr == '+')
289             {
290                 threadsPerPool[numNumaNodes] += cpusPerNode[i];
291                 nodeMaskPerPool[numNumaNodes] = (1U << i);
292             }
293             else
294             {
295                 int count = atoi(nodeStr);
296                 threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
297                 nodeMaskPerPool[i] = (1U << i);
298             }
299 
300             /* consume current node string, comma, and white-space */
301             while (*nodeStr && *nodeStr != ',')
302                ++nodeStr;
303             if (*nodeStr == ',' || *nodeStr == ' ')
304                ++nodeStr;
305         }
306     }
307     else
308     {
309         for (int i = 0; i < numNumaNodes; i++)
310         {
311             threadsPerPool[numNumaNodes]  += cpusPerNode[i];
312             nodeMaskPerPool[numNumaNodes] |= (1U << i);
313         }
314     }
315 
316     // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
317     if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
318         ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
319     {
320         threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
321         x265_log(p, X265_LOG_DEBUG,
322                  "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
323     }
324 
325     numPools = 0;
326     for (int i = 0; i < numNumaNodes + 1; i++)
327     {
328         if (bNumaSupport)
329             x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
330         if (threadsPerPool[i])
331             numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
332     }
333 
334     if (!numPools)
335         return NULL;
336 
337     if (numPools > p->frameNumThreads)
338     {
339         x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
340         numPools = X265_MAX(p->frameNumThreads / 2, 1);
341     }
342 
343     ThreadPool *pools = new ThreadPool[numPools];
344     if (pools)
345     {
346         int maxProviders = (p->frameNumThreads + numPools - 1) / numPools + 1; /* +1 is Lookahead, always assigned to threadpool 0 */
347         int node = 0;
348         for (int i = 0; i < numPools; i++)
349         {
350             while (!threadsPerPool[node])
351                 node++;
352             int numThreads = X265_MIN(MAX_POOL_THREADS, threadsPerPool[node]);
353             if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
354             {
355                 X265_FREE(pools);
356                 numPools = 0;
357                 return NULL;
358             }
359             if (numNumaNodes > 1)
360                 x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads with NUMA node mask %lx\n", i, numThreads, nodeMaskPerPool[node]);
361             else
362                 x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
363             threadsPerPool[node] -= numThreads;
364         }
365     }
366     else
367         numPools = 0;
368     return pools;
369 }
370 
ThreadPool()371 ThreadPool::ThreadPool()
372 {
373     memset(this, 0, sizeof(*this));
374 }
375 
create(int numThreads,int maxProviders,uint32_t nodeMask)376 bool ThreadPool::create(int numThreads, int maxProviders, uint32_t nodeMask)
377 {
378     X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
379 
380 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
381     m_winCpuMask = 0x0;
382     GROUP_AFFINITY groupAffinity;
383     for (int i = 0; i < getNumaNodeCount(); i++)
384     {
385         int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
386         if (numaNode != -1)
387             if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &groupAffinity))
388                 m_winCpuMask |= groupAffinity.Mask;
389     }
390     m_numaMask = &m_winCpuMask;
391 #elif HAVE_LIBNUMA
392     if (numa_available() >= 0)
393     {
394         struct bitmask* nodemask = numa_allocate_nodemask();
395         if (nodemask)
396         {
397             *(nodemask->maskp) = nodeMask;
398             m_numaMask = nodemask;
399         }
400         else
401             x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
402     }
403 #else
404     (void)nodeMask;
405 #endif
406 
407     m_numWorkers = numThreads;
408 
409     m_workers = X265_MALLOC(WorkerThread, numThreads);
410     /* placement new initialization */
411     if (m_workers)
412         for (int i = 0; i < numThreads; i++)
413             new (m_workers + i)WorkerThread(*this, i);
414 
415     m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
416     m_numProviders = 0;
417 
418     return m_workers && m_jpTable;
419 }
420 
start()421 bool ThreadPool::start()
422 {
423     m_isActive = true;
424     for (int i = 0; i < m_numWorkers; i++)
425     {
426         if (!m_workers[i].start())
427         {
428             m_isActive = false;
429             return false;
430         }
431     }
432     return true;
433 }
434 
stopWorkers()435 void ThreadPool::stopWorkers()
436 {
437     if (m_workers)
438     {
439         m_isActive = false;
440         for (int i = 0; i < m_numWorkers; i++)
441         {
442             while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
443                 GIVE_UP_TIME();
444             m_workers[i].awaken();
445             m_workers[i].stop();
446         }
447     }
448 }
449 
~ThreadPool()450 ThreadPool::~ThreadPool()
451 {
452     if (m_workers)
453     {
454         for (int i = 0; i < m_numWorkers; i++)
455             m_workers[i].~WorkerThread();
456     }
457 
458     X265_FREE(m_workers);
459     X265_FREE(m_jpTable);
460 
461 #if HAVE_LIBNUMA
462     if(m_numaMask)
463         numa_free_nodemask((struct bitmask*)m_numaMask);
464 #endif
465 }
466 
setCurrentThreadAffinity()467 void ThreadPool::setCurrentThreadAffinity()
468 {
469     setThreadNodeAffinity(m_numaMask);
470 }
471 
472 /* static */
setThreadNodeAffinity(void * numaMask)473 void ThreadPool::setThreadNodeAffinity(void *numaMask)
474 {
475 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
476     if (SetThreadAffinityMask(GetCurrentThread(), (DWORD_PTR)(*((DWORD*)numaMask))))
477         return;
478     else
479         x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
480 #elif HAVE_LIBNUMA
481     if (numa_available() >= 0)
482     {
483         numa_run_on_node_mask((struct bitmask*)numaMask);
484         numa_set_interleave_mask((struct bitmask*)numaMask);
485         numa_set_localalloc();
486         return;
487     }
488     x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
489 #else
490     (void)numaMask;
491 #endif
492     return;
493 }
494 
495 /* static */
getNumaNodeCount()496 int ThreadPool::getNumaNodeCount()
497 {
498 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
499     ULONG num = 1;
500     if (GetNumaHighestNodeNumber(&num))
501         num++;
502     return (int)num;
503 #elif HAVE_LIBNUMA
504     if (numa_available() >= 0)
505         return numa_max_node() + 1;
506     else
507         return 1;
508 #else
509     return 1;
510 #endif
511 }
512 
513 /* static */
getCpuCount()514 int ThreadPool::getCpuCount()
515 {
516 #if _WIN32
517     SYSTEM_INFO sysinfo;
518     GetSystemInfo(&sysinfo);
519     return sysinfo.dwNumberOfProcessors;
520 #elif __unix__
521     return sysconf(_SC_NPROCESSORS_ONLN);
522 #elif MACOS
523     int nm[2];
524     size_t len = 4;
525     uint32_t count;
526 
527     nm[0] = CTL_HW;
528     nm[1] = HW_AVAILCPU;
529     sysctl(nm, 2, &count, &len, NULL, 0);
530 
531     if (count < 1)
532     {
533         nm[1] = HW_NCPU;
534         sysctl(nm, 2, &count, &len, NULL, 0);
535         if (count < 1)
536             count = 1;
537     }
538 
539     return count;
540 #else
541     return 2; // default to 2 threads, everywhere else
542 #endif
543 }
544 
545 } // end namespace X265_NS
546