1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/DenseSet.h"
12 #include "llvm/ADT/STLExtras.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/Host.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 
19 #include "gtest/gtest.h"
20 
21 using namespace llvm;
22 
23 // Fixture for the unittests, allowing to *temporarily* disable the unittests
24 // on a particular platform
25 class ThreadPoolTest : public testing::Test {
26   Triple Host;
27   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
28   SmallVector<Triple::OSType, 4> UnsupportedOSs;
29   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
30 protected:
31   // This is intended for platform as a temporary "XFAIL"
isUnsupportedOSOrEnvironment()32   bool isUnsupportedOSOrEnvironment() {
33     Triple Host(Triple::normalize(sys::getProcessTriple()));
34 
35     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
36         UnsupportedEnvironments.end())
37       return true;
38 
39     if (is_contained(UnsupportedOSs, Host.getOS()))
40       return true;
41 
42     if (is_contained(UnsupportedArchs, Host.getArch()))
43       return true;
44 
45     return false;
46   }
47 
ThreadPoolTest()48   ThreadPoolTest() {
49     // Add unsupported configuration here, example:
50     //   UnsupportedArchs.push_back(Triple::x86_64);
51 
52     // See https://llvm.org/bugs/show_bug.cgi?id=25829
53     UnsupportedArchs.push_back(Triple::ppc64le);
54     UnsupportedArchs.push_back(Triple::ppc64);
55   }
56 
57   /// Make sure this thread not progress faster than the main thread.
waitForMainThread()58   void waitForMainThread() {
59     std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
60     WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
61   }
62 
63   /// Set the readiness of the main thread.
setMainThreadReady()64   void setMainThreadReady() {
65     {
66       std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
67       MainThreadReady = true;
68     }
69     WaitMainThread.notify_all();
70   }
71 
SetUp()72   void SetUp() override { MainThreadReady = false; }
73 
74   void RunOnAllSockets(ThreadPoolStrategy S);
75 
76   std::condition_variable WaitMainThread;
77   std::mutex WaitMainThreadMutex;
78   bool MainThreadReady = false;
79 };
80 
81 #define CHECK_UNSUPPORTED() \
82   do { \
83     if (isUnsupportedOSOrEnvironment()) \
84       return; \
85   } while (0); \
86 
TEST_F(ThreadPoolTest,AsyncBarrier)87 TEST_F(ThreadPoolTest, AsyncBarrier) {
88   CHECK_UNSUPPORTED();
89   // test that async & barrier work together properly.
90 
91   std::atomic_int checked_in{0};
92 
93   ThreadPool Pool;
94   for (size_t i = 0; i < 5; ++i) {
95     Pool.async([this, &checked_in] {
96       waitForMainThread();
97       ++checked_in;
98     });
99   }
100   ASSERT_EQ(0, checked_in);
101   setMainThreadReady();
102   Pool.wait();
103   ASSERT_EQ(5, checked_in);
104 }
105 
TestFunc(std::atomic_int & checked_in,int i)106 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
107 
TEST_F(ThreadPoolTest,AsyncBarrierArgs)108 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
109   CHECK_UNSUPPORTED();
110   // Test that async works with a function requiring multiple parameters.
111   std::atomic_int checked_in{0};
112 
113   ThreadPool Pool;
114   for (size_t i = 0; i < 5; ++i) {
115     Pool.async(TestFunc, std::ref(checked_in), i);
116   }
117   Pool.wait();
118   ASSERT_EQ(10, checked_in);
119 }
120 
TEST_F(ThreadPoolTest,Async)121 TEST_F(ThreadPoolTest, Async) {
122   CHECK_UNSUPPORTED();
123   ThreadPool Pool;
124   std::atomic_int i{0};
125   Pool.async([this, &i] {
126     waitForMainThread();
127     ++i;
128   });
129   Pool.async([&i] { ++i; });
130   ASSERT_NE(2, i.load());
131   setMainThreadReady();
132   Pool.wait();
133   ASSERT_EQ(2, i.load());
134 }
135 
TEST_F(ThreadPoolTest,GetFuture)136 TEST_F(ThreadPoolTest, GetFuture) {
137   CHECK_UNSUPPORTED();
138   ThreadPool Pool(hardware_concurrency(2));
139   std::atomic_int i{0};
140   Pool.async([this, &i] {
141     waitForMainThread();
142     ++i;
143   });
144   // Force the future using get()
145   Pool.async([&i] { ++i; }).get();
146   ASSERT_NE(2, i.load());
147   setMainThreadReady();
148   Pool.wait();
149   ASSERT_EQ(2, i.load());
150 }
151 
TEST_F(ThreadPoolTest,PoolDestruction)152 TEST_F(ThreadPoolTest, PoolDestruction) {
153   CHECK_UNSUPPORTED();
154   // Test that we are waiting on destruction
155   std::atomic_int checked_in{0};
156   {
157     ThreadPool Pool;
158     for (size_t i = 0; i < 5; ++i) {
159       Pool.async([this, &checked_in] {
160         waitForMainThread();
161         ++checked_in;
162       });
163     }
164     ASSERT_EQ(0, checked_in);
165     setMainThreadReady();
166   }
167   ASSERT_EQ(5, checked_in);
168 }
169 
170 #if LLVM_ENABLE_THREADS == 1
171 
RunOnAllSockets(ThreadPoolStrategy S)172 void ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
173   // FIXME: Skip these tests on non-Windows because multi-socket system were not
174   // tested on Unix yet, and llvm::get_thread_affinity_mask() isn't implemented
175   // for Unix.
176   Triple Host(Triple::normalize(sys::getProcessTriple()));
177   if (!Host.isOSWindows())
178     return;
179 
180   llvm::DenseSet<llvm::BitVector> ThreadsUsed;
181   std::mutex Lock;
182   {
183     std::condition_variable AllThreads;
184     std::mutex AllThreadsLock;
185     unsigned Active = 0;
186 
187     ThreadPool Pool(S);
188     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
189       Pool.async([&] {
190         {
191           std::lock_guard<std::mutex> Guard(AllThreadsLock);
192           ++Active;
193           AllThreads.notify_one();
194         }
195         waitForMainThread();
196         std::lock_guard<std::mutex> Guard(Lock);
197         auto Mask = llvm::get_thread_affinity_mask();
198         ThreadsUsed.insert(Mask);
199       });
200     }
201     ASSERT_EQ(true, ThreadsUsed.empty());
202     {
203       std::unique_lock<std::mutex> Guard(AllThreadsLock);
204       AllThreads.wait(Guard,
205                       [&]() { return Active == S.compute_thread_count(); });
206     }
207     setMainThreadReady();
208   }
209   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
210 }
211 
TEST_F(ThreadPoolTest,AllThreads_UseAllRessources)212 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
213   CHECK_UNSUPPORTED();
214   RunOnAllSockets({});
215 }
216 
TEST_F(ThreadPoolTest,AllThreads_OneThreadPerCore)217 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
218   CHECK_UNSUPPORTED();
219   RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
220 }
221 
222 #endif
223