1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 #include <thrift/concurrency/Thread.h>
22 #include <thrift/concurrency/PlatformThreadFactory.h>
23 #include <thrift/concurrency/Monitor.h>
24 #include <thrift/concurrency/Mutex.h>
25 #include <thrift/concurrency/Util.h>
26 
27 #include <assert.h>
28 #include <iostream>
29 #include <vector>
30 
31 namespace apache {
32 namespace thrift {
33 namespace concurrency {
34 namespace test {
35 
36 using stdcxx::shared_ptr;
37 using namespace apache::thrift::concurrency;
38 
39 /**
40  * ThreadManagerTests class
41  *
42  * @version $Id:$
43  */
44 class ThreadFactoryTests {
45 
46 public:
47   /**
48    * Reap N threads
49    */
50   class ReapNTask : public Runnable {
51 
52   public:
ReapNTask(Monitor & monitor,int & activeCount)53     ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {}
54 
run()55     void run() {
56       Synchronized s(_monitor);
57 
58       if (--_count == 0) {
59         _monitor.notify();
60       }
61     }
62 
63     Monitor& _monitor;
64     int& _count;
65   };
66 
67   bool reapNThreads(int loop = 1, int count = 10) {
68 
69     PlatformThreadFactory threadFactory = PlatformThreadFactory();
70     shared_ptr<Monitor> monitor(new Monitor);
71 
72     for (int lix = 0; lix < loop; lix++) {
73 
74       int activeCount = 0;
75 
76       std::vector<shared_ptr<Thread> > threads;
77       int tix;
78 
79       for (tix = 0; tix < count; tix++) {
80         try {
81           ++activeCount;
82           threads.push_back(
83               threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, activeCount))));
catch(SystemResourceException & e)84         } catch (SystemResourceException& e) {
85           std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what()
86                     << std::endl;
87           throw e;
88         }
89       }
90 
91       tix = 0;
92       for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin();
93            thread != threads.end();
94            tix++, ++thread) {
95 
96         try {
97           (*thread)->start();
catch(SystemResourceException & e)98         } catch (SystemResourceException& e) {
99           std::cout << "\t\t\tfailed to start  " << lix* count + tix << " thread " << e.what()
100                     << std::endl;
101           throw e;
102         }
103       }
104 
105       {
106         Synchronized s(*monitor);
107         while (activeCount > 0) {
108           monitor->wait(1000);
109         }
110       }
111 
112       std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl;
113     }
114 
115     std::cout << "\t\t\tSuccess!" << std::endl;
116     return true;
117   }
118 
119   class SynchStartTask : public Runnable {
120 
121   public:
122     enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
123 
SynchStartTask(Monitor & monitor,volatile STATE & state)124     SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {}
125 
run()126     void run() {
127       {
128         Synchronized s(_monitor);
129         if (_state == SynchStartTask::STARTING) {
130           _state = SynchStartTask::STARTED;
131           _monitor.notify();
132         }
133       }
134 
135       {
136         Synchronized s(_monitor);
137         while (_state == SynchStartTask::STARTED) {
138           _monitor.wait();
139         }
140 
141         if (_state == SynchStartTask::STOPPING) {
142           _state = SynchStartTask::STOPPED;
143           _monitor.notifyAll();
144         }
145       }
146     }
147 
148   private:
149     Monitor& _monitor;
150     volatile STATE& _state;
151   };
152 
synchStartTest()153   bool synchStartTest() {
154 
155     Monitor monitor;
156 
157     SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
158 
159     shared_ptr<SynchStartTask> task
160         = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
161 
162     PlatformThreadFactory threadFactory = PlatformThreadFactory();
163 
164     shared_ptr<Thread> thread = threadFactory.newThread(task);
165 
166     if (state == SynchStartTask::UNINITIALIZED) {
167 
168       state = SynchStartTask::STARTING;
169 
170       thread->start();
171     }
172 
173     {
174       Synchronized s(monitor);
175       while (state == SynchStartTask::STARTING) {
176         monitor.wait();
177       }
178     }
179 
180     assert(state != SynchStartTask::STARTING);
181 
182     {
183       Synchronized s(monitor);
184 
185       try {
186         monitor.wait(100);
187       } catch (TimedOutException&) {
188       }
189 
190       if (state == SynchStartTask::STARTED) {
191 
192         state = SynchStartTask::STOPPING;
193 
194         monitor.notify();
195       }
196 
197       while (state == SynchStartTask::STOPPING) {
198         monitor.wait();
199       }
200     }
201 
202     assert(state == SynchStartTask::STOPPED);
203 
204     bool success = true;
205 
206     std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
207 
208     return true;
209   }
210 
211   /**
212    * The only guarantee a monitor timeout can give you is that
213    * it will take "at least" as long as the timeout, no less.
214    * There is absolutely no guarantee around regaining execution
215    * near the timeout.  On a busy system (like inside a third party
216    * CI environment) it could take quite a bit longer than the
217    * requested timeout, and that's ok.
218    */
219 
220   bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) {
221 
222     Monitor monitor;
223 
224     int64_t startTime = Util::currentTime();
225 
226     for (int64_t ix = 0; ix < count; ix++) {
227       {
228         Synchronized s(monitor);
229         try {
230           monitor.wait(timeout);
catch(TimedOutException &)231         } catch (TimedOutException&) {
232         }
233       }
234     }
235 
236     int64_t endTime = Util::currentTime();
237 
238   bool success = (endTime - startTime) >= (count * timeout);
239 
240     std::cout << "\t\t\t" << (success ? "Success" : "Failure")
241               << ": minimum required time to elapse " << count * timeout
242               << "ms; actual elapsed time " << endTime - startTime << "ms"
243               << std::endl;
244 
245     return success;
246   }
247 
248   class FloodTask : public Runnable {
249   public:
FloodTask(const size_t id,Monitor & mon)250     FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {}
~FloodTask()251     ~FloodTask() {
252       if (_id % 10000 == 0) {
253 		Synchronized sync(_mon);
254         std::cout << "\t\tthread " << _id << " done" << std::endl;
255       }
256     }
257 
run()258     void run() {
259       if (_id % 10000 == 0) {
260 		Synchronized sync(_mon);
261         std::cout << "\t\tthread " << _id << " started" << std::endl;
262       }
263     }
264     const size_t _id;
265     Monitor& _mon;
266   };
267 
foo(PlatformThreadFactory * tf)268   void foo(PlatformThreadFactory* tf) { (void)tf; }
269 
270   bool floodNTest(size_t loop = 1, size_t count = 100000) {
271 
272     bool success = false;
273     Monitor mon;
274 
275     for (size_t lix = 0; lix < loop; lix++) {
276 
277       PlatformThreadFactory threadFactory = PlatformThreadFactory();
278       threadFactory.setDetached(true);
279 
280       for (size_t tix = 0; tix < count; tix++) {
281 
282         try {
283 
284           shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon));
285           shared_ptr<Thread> thread = threadFactory.newThread(task);
286           thread->start();
287 
catch(TException & e)288         } catch (TException& e) {
289 
290           std::cout << "\t\t\tfailed to start  " << lix* count + tix << " thread " << e.what()
291                     << std::endl;
292 
293           return success;
294         }
295       }
296 
297       Synchronized sync(mon);
298       std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
299       success = true;
300     }
301 
302     return success;
303   }
304 };
305 
306 }
307 }
308 }
309 } // apache::thrift::concurrency::test
310