1 /*****************************************************************************
2 * Copyright (C) 2013 x265 project
3 *
4 * Authors: Steve Borho <steve@borho.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
19 *
20 * This program is also available under a commercial proprietary license.
21 * For more information, contact us at license @ x265.com
22 *****************************************************************************/
23
24 #include "common.h"
25 #include "threadpool.h"
26 #include "threading.h"
27
28 #include <new>
29
30 #if X86_64
31
32 #ifdef __GNUC__
33
34 #define SLEEPBITMAP_CTZ(id, x) id = (unsigned long)__builtin_ctzll(x)
35 #define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask)
36 #define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
37
38 #elif defined(_MSC_VER)
39
40 #define SLEEPBITMAP_CTZ(id, x) _BitScanForward64(&id, x)
41 #define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
42 #define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
43
44 #endif // ifdef __GNUC__
45
46 #else
47
48 /* use 32-bit primitives defined in threading.h */
49 #define SLEEPBITMAP_CTZ CTZ
50 #define SLEEPBITMAP_OR ATOMIC_OR
51 #define SLEEPBITMAP_AND ATOMIC_AND
52
53 #endif
54
55 #if MACOS
56 #include <sys/param.h>
57 #include <sys/sysctl.h>
58 #endif
59 #if HAVE_LIBNUMA
60 #include <numa.h>
61 #endif
62
63 namespace X265_NS {
64 // x265 private namespace
65
66 class WorkerThread : public Thread
67 {
68 private:
69
70 ThreadPool& m_pool;
71 int m_id;
72 Event m_wakeEvent;
73
74 WorkerThread& operator =(const WorkerThread&);
75
76 public:
77
78 JobProvider* m_curJobProvider;
79 BondedTaskGroup* m_bondMaster;
80
WorkerThread(ThreadPool & pool,int id)81 WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
~WorkerThread()82 virtual ~WorkerThread() {}
83
84 void threadMain();
awaken()85 void awaken() { m_wakeEvent.trigger(); }
86 };
87
threadMain()88 void WorkerThread::threadMain()
89 {
90 THREAD_NAME("Worker", m_id);
91
92 #if _WIN32
93 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
94 #else
95 __attribute__((unused)) int val = nice(10);
96 #endif
97
98 m_pool.setCurrentThreadAffinity();
99
100 sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
101 m_curJobProvider = m_pool.m_jpTable[0];
102 m_bondMaster = NULL;
103
104 SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
105 SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
106 m_wakeEvent.wait();
107
108 while (m_pool.m_isActive)
109 {
110 if (m_bondMaster)
111 {
112 m_bondMaster->processTasks(m_id);
113 m_bondMaster->m_exitedPeerCount.incr();
114 m_bondMaster = NULL;
115 }
116
117 do
118 {
119 /* do pending work for current job provider */
120 m_curJobProvider->findJob(m_id);
121
122 /* if the current job provider still wants help, only switch to a
123 * higher priority provider (lower slice type). Else take the first
124 * available job provider with the highest priority */
125 int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
126 INVALID_SLICE_PRIORITY + 1;
127 int nextProvider = -1;
128 for (int i = 0; i < m_pool.m_numProviders; i++)
129 {
130 if (m_pool.m_jpTable[i]->m_helpWanted &&
131 m_pool.m_jpTable[i]->m_sliceType < curPriority)
132 {
133 nextProvider = i;
134 curPriority = m_pool.m_jpTable[i]->m_sliceType;
135 }
136 }
137 if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
138 {
139 SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
140 m_curJobProvider = m_pool.m_jpTable[nextProvider];
141 SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
142 }
143 }
144 while (m_curJobProvider->m_helpWanted);
145
146 /* While the worker sleeps, a job-provider or bond-group may acquire this
147 * worker's sleep bitmap bit. Once acquired, that thread may modify
148 * m_bondMaster or m_curJobProvider, then waken the thread */
149 SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
150 m_wakeEvent.wait();
151 }
152
153 SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
154 }
155
tryWakeOne()156 void JobProvider::tryWakeOne()
157 {
158 int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
159 if (id < 0)
160 {
161 m_helpWanted = true;
162 return;
163 }
164
165 WorkerThread& worker = m_pool->m_workers[id];
166 if (worker.m_curJobProvider != this) /* poaching */
167 {
168 sleepbitmap_t bit = (sleepbitmap_t)1 << id;
169 SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
170 worker.m_curJobProvider = this;
171 SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
172 }
173 worker.awaken();
174 }
175
tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap,sleepbitmap_t secondTryBitmap)176 int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
177 {
178 unsigned long id;
179
180 sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
181 while (masked)
182 {
183 SLEEPBITMAP_CTZ(id, masked);
184
185 sleepbitmap_t bit = (sleepbitmap_t)1 << id;
186 if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
187 return (int)id;
188
189 masked = m_sleepBitmap & firstTryBitmap;
190 }
191
192 masked = m_sleepBitmap & secondTryBitmap;
193 while (masked)
194 {
195 SLEEPBITMAP_CTZ(id, masked);
196
197 sleepbitmap_t bit = (sleepbitmap_t)1 << id;
198 if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
199 return (int)id;
200
201 masked = m_sleepBitmap & secondTryBitmap;
202 }
203
204 return -1;
205 }
206
tryBondPeers(int maxPeers,sleepbitmap_t peerBitmap,BondedTaskGroup & master)207 int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
208 {
209 int bondCount = 0;
210 do
211 {
212 int id = tryAcquireSleepingThread(peerBitmap, 0);
213 if (id < 0)
214 return bondCount;
215
216 m_workers[id].m_bondMaster = &master;
217 m_workers[id].awaken();
218 bondCount++;
219 }
220 while (bondCount < maxPeers);
221
222 return bondCount;
223 }
224
allocThreadPools(x265_param * p,int & numPools)225 ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools)
226 {
227 enum { MAX_NODE_NUM = 127 };
228 int cpusPerNode[MAX_NODE_NUM + 1];
229 int threadsPerPool[MAX_NODE_NUM + 2];
230 uint32_t nodeMaskPerPool[MAX_NODE_NUM +2];
231
232 memset(cpusPerNode, 0, sizeof(cpusPerNode));
233 memset(threadsPerPool, 0, sizeof(threadsPerPool));
234 memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
235
236 int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
237 int cpuCount = getCpuCount();
238 bool bNumaSupport = false;
239
240 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
241 bNumaSupport = true;
242 #elif HAVE_LIBNUMA
243 bNumaSupport = numa_available() >= 0;
244 #endif
245
246
247 for (int i = 0; i < cpuCount; i++)
248 {
249 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
250 UCHAR node;
251 if (GetNumaProcessorNode((UCHAR)i, &node))
252 cpusPerNode[X265_MIN(node, (UCHAR)MAX_NODE_NUM)]++;
253 else
254 #elif HAVE_LIBNUMA
255 if (bNumaSupport >= 0)
256 cpusPerNode[X265_MIN(numa_node_of_cpu(i), MAX_NODE_NUM)]++;
257 else
258 #endif
259 cpusPerNode[0]++;
260 }
261
262 if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
263 for (int i = 0; i < numNumaNodes; i++)
264 x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
265
266 /* limit threads based on param->numaPools */
267 if (p->numaPools && *p->numaPools)
268 {
269 const char *nodeStr = p->numaPools;
270 for (int i = 0; i < numNumaNodes; i++)
271 {
272 if (!*nodeStr)
273 {
274 threadsPerPool[i] = 0;
275 continue;
276 }
277 else if (*nodeStr == '-')
278 threadsPerPool[i] = 0;
279 else if (*nodeStr == '*')
280 {
281 for (int j = i; j < numNumaNodes; j++)
282 {
283 threadsPerPool[numNumaNodes] += cpusPerNode[j];
284 nodeMaskPerPool[numNumaNodes] |= (1U << j);
285 }
286 break;
287 }
288 else if (*nodeStr == '+')
289 {
290 threadsPerPool[numNumaNodes] += cpusPerNode[i];
291 nodeMaskPerPool[numNumaNodes] = (1U << i);
292 }
293 else
294 {
295 int count = atoi(nodeStr);
296 threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
297 nodeMaskPerPool[i] = (1U << i);
298 }
299
300 /* consume current node string, comma, and white-space */
301 while (*nodeStr && *nodeStr != ',')
302 ++nodeStr;
303 if (*nodeStr == ',' || *nodeStr == ' ')
304 ++nodeStr;
305 }
306 }
307 else
308 {
309 for (int i = 0; i < numNumaNodes; i++)
310 {
311 threadsPerPool[numNumaNodes] += cpusPerNode[i];
312 nodeMaskPerPool[numNumaNodes] |= (1U << i);
313 }
314 }
315
316 // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
317 if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
318 ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
319 {
320 threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
321 x265_log(p, X265_LOG_DEBUG,
322 "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
323 }
324
325 numPools = 0;
326 for (int i = 0; i < numNumaNodes + 1; i++)
327 {
328 if (bNumaSupport)
329 x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
330 if (threadsPerPool[i])
331 numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
332 }
333
334 if (!numPools)
335 return NULL;
336
337 if (numPools > p->frameNumThreads)
338 {
339 x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
340 numPools = X265_MAX(p->frameNumThreads / 2, 1);
341 }
342
343 ThreadPool *pools = new ThreadPool[numPools];
344 if (pools)
345 {
346 int maxProviders = (p->frameNumThreads + numPools - 1) / numPools + 1; /* +1 is Lookahead, always assigned to threadpool 0 */
347 int node = 0;
348 for (int i = 0; i < numPools; i++)
349 {
350 while (!threadsPerPool[node])
351 node++;
352 int numThreads = X265_MIN(MAX_POOL_THREADS, threadsPerPool[node]);
353 if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
354 {
355 X265_FREE(pools);
356 numPools = 0;
357 return NULL;
358 }
359 if (numNumaNodes > 1)
360 x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads with NUMA node mask %lx\n", i, numThreads, nodeMaskPerPool[node]);
361 else
362 x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
363 threadsPerPool[node] -= numThreads;
364 }
365 }
366 else
367 numPools = 0;
368 return pools;
369 }
370
ThreadPool()371 ThreadPool::ThreadPool()
372 {
373 memset(this, 0, sizeof(*this));
374 }
375
create(int numThreads,int maxProviders,uint32_t nodeMask)376 bool ThreadPool::create(int numThreads, int maxProviders, uint32_t nodeMask)
377 {
378 X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
379
380 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
381 m_winCpuMask = 0x0;
382 GROUP_AFFINITY groupAffinity;
383 for (int i = 0; i < getNumaNodeCount(); i++)
384 {
385 int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
386 if (numaNode != -1)
387 if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &groupAffinity))
388 m_winCpuMask |= groupAffinity.Mask;
389 }
390 m_numaMask = &m_winCpuMask;
391 #elif HAVE_LIBNUMA
392 if (numa_available() >= 0)
393 {
394 struct bitmask* nodemask = numa_allocate_nodemask();
395 if (nodemask)
396 {
397 *(nodemask->maskp) = nodeMask;
398 m_numaMask = nodemask;
399 }
400 else
401 x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
402 }
403 #else
404 (void)nodeMask;
405 #endif
406
407 m_numWorkers = numThreads;
408
409 m_workers = X265_MALLOC(WorkerThread, numThreads);
410 /* placement new initialization */
411 if (m_workers)
412 for (int i = 0; i < numThreads; i++)
413 new (m_workers + i)WorkerThread(*this, i);
414
415 m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
416 m_numProviders = 0;
417
418 return m_workers && m_jpTable;
419 }
420
start()421 bool ThreadPool::start()
422 {
423 m_isActive = true;
424 for (int i = 0; i < m_numWorkers; i++)
425 {
426 if (!m_workers[i].start())
427 {
428 m_isActive = false;
429 return false;
430 }
431 }
432 return true;
433 }
434
stopWorkers()435 void ThreadPool::stopWorkers()
436 {
437 if (m_workers)
438 {
439 m_isActive = false;
440 for (int i = 0; i < m_numWorkers; i++)
441 {
442 while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
443 GIVE_UP_TIME();
444 m_workers[i].awaken();
445 m_workers[i].stop();
446 }
447 }
448 }
449
~ThreadPool()450 ThreadPool::~ThreadPool()
451 {
452 if (m_workers)
453 {
454 for (int i = 0; i < m_numWorkers; i++)
455 m_workers[i].~WorkerThread();
456 }
457
458 X265_FREE(m_workers);
459 X265_FREE(m_jpTable);
460
461 #if HAVE_LIBNUMA
462 if(m_numaMask)
463 numa_free_nodemask((struct bitmask*)m_numaMask);
464 #endif
465 }
466
setCurrentThreadAffinity()467 void ThreadPool::setCurrentThreadAffinity()
468 {
469 setThreadNodeAffinity(m_numaMask);
470 }
471
472 /* static */
setThreadNodeAffinity(void * numaMask)473 void ThreadPool::setThreadNodeAffinity(void *numaMask)
474 {
475 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
476 if (SetThreadAffinityMask(GetCurrentThread(), (DWORD_PTR)(*((DWORD*)numaMask))))
477 return;
478 else
479 x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
480 #elif HAVE_LIBNUMA
481 if (numa_available() >= 0)
482 {
483 numa_run_on_node_mask((struct bitmask*)numaMask);
484 numa_set_interleave_mask((struct bitmask*)numaMask);
485 numa_set_localalloc();
486 return;
487 }
488 x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
489 #else
490 (void)numaMask;
491 #endif
492 return;
493 }
494
495 /* static */
getNumaNodeCount()496 int ThreadPool::getNumaNodeCount()
497 {
498 #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
499 ULONG num = 1;
500 if (GetNumaHighestNodeNumber(&num))
501 num++;
502 return (int)num;
503 #elif HAVE_LIBNUMA
504 if (numa_available() >= 0)
505 return numa_max_node() + 1;
506 else
507 return 1;
508 #else
509 return 1;
510 #endif
511 }
512
513 /* static */
getCpuCount()514 int ThreadPool::getCpuCount()
515 {
516 #if _WIN32
517 SYSTEM_INFO sysinfo;
518 GetSystemInfo(&sysinfo);
519 return sysinfo.dwNumberOfProcessors;
520 #elif __unix__
521 return sysconf(_SC_NPROCESSORS_ONLN);
522 #elif MACOS
523 int nm[2];
524 size_t len = 4;
525 uint32_t count;
526
527 nm[0] = CTL_HW;
528 nm[1] = HW_AVAILCPU;
529 sysctl(nm, 2, &count, &len, NULL, 0);
530
531 if (count < 1)
532 {
533 nm[1] = HW_NCPU;
534 sysctl(nm, 2, &count, &len, NULL, 0);
535 if (count < 1)
536 count = 1;
537 }
538
539 return count;
540 #else
541 return 2; // default to 2 threads, everywhere else
542 #endif
543 }
544
545 } // end namespace X265_NS
546