1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #include <thrift/thrift-config.h> 21 #include <thrift/concurrency/Thread.h> 22 #include <thrift/concurrency/PlatformThreadFactory.h> 23 #include <thrift/concurrency/Monitor.h> 24 #include <thrift/concurrency/Mutex.h> 25 #include <thrift/concurrency/Util.h> 26 27 #include <assert.h> 28 #include <iostream> 29 #include <vector> 30 31 namespace apache { 32 namespace thrift { 33 namespace concurrency { 34 namespace test { 35 36 using stdcxx::shared_ptr; 37 using namespace apache::thrift::concurrency; 38 39 /** 40 * ThreadManagerTests class 41 * 42 * @version $Id:$ 43 */ 44 class ThreadFactoryTests { 45 46 public: 47 /** 48 * Reap N threads 49 */ 50 class ReapNTask : public Runnable { 51 52 public: ReapNTask(Monitor & monitor,int & activeCount)53 ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {} 54 run()55 void run() { 56 Synchronized s(_monitor); 57 58 if (--_count == 0) { 59 _monitor.notify(); 60 } 61 } 62 63 Monitor& _monitor; 64 int& _count; 65 }; 66 67 bool reapNThreads(int loop = 1, int count = 10) { 68 69 PlatformThreadFactory threadFactory = PlatformThreadFactory(); 70 shared_ptr<Monitor> monitor(new Monitor); 71 72 for (int lix = 0; lix < loop; lix++) { 73 74 int activeCount = 0; 75 76 std::vector<shared_ptr<Thread> > threads; 77 int tix; 78 79 for (tix = 0; tix < count; tix++) { 80 try { 81 ++activeCount; 82 threads.push_back( 83 threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, activeCount)))); catch(SystemResourceException & e)84 } catch (SystemResourceException& e) { 85 std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what() 86 << std::endl; 87 throw e; 88 } 89 } 90 91 tix = 0; 92 for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin(); 93 thread != threads.end(); 94 tix++, ++thread) { 95 96 try { 97 (*thread)->start(); catch(SystemResourceException & e)98 } catch (SystemResourceException& e) { 99 std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() 100 << std::endl; 101 throw e; 102 } 103 } 104 105 { 106 Synchronized s(*monitor); 107 while (activeCount > 0) { 108 monitor->wait(1000); 109 } 110 } 111 112 std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl; 113 } 114 115 std::cout << "\t\t\tSuccess!" << std::endl; 116 return true; 117 } 118 119 class SynchStartTask : public Runnable { 120 121 public: 122 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; 123 SynchStartTask(Monitor & monitor,volatile STATE & state)124 SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {} 125 run()126 void run() { 127 { 128 Synchronized s(_monitor); 129 if (_state == SynchStartTask::STARTING) { 130 _state = SynchStartTask::STARTED; 131 _monitor.notify(); 132 } 133 } 134 135 { 136 Synchronized s(_monitor); 137 while (_state == SynchStartTask::STARTED) { 138 _monitor.wait(); 139 } 140 141 if (_state == SynchStartTask::STOPPING) { 142 _state = SynchStartTask::STOPPED; 143 _monitor.notifyAll(); 144 } 145 } 146 } 147 148 private: 149 Monitor& _monitor; 150 volatile STATE& _state; 151 }; 152 synchStartTest()153 bool synchStartTest() { 154 155 Monitor monitor; 156 157 SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; 158 159 shared_ptr<SynchStartTask> task 160 = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state)); 161 162 PlatformThreadFactory threadFactory = PlatformThreadFactory(); 163 164 shared_ptr<Thread> thread = threadFactory.newThread(task); 165 166 if (state == SynchStartTask::UNINITIALIZED) { 167 168 state = SynchStartTask::STARTING; 169 170 thread->start(); 171 } 172 173 { 174 Synchronized s(monitor); 175 while (state == SynchStartTask::STARTING) { 176 monitor.wait(); 177 } 178 } 179 180 assert(state != SynchStartTask::STARTING); 181 182 { 183 Synchronized s(monitor); 184 185 try { 186 monitor.wait(100); 187 } catch (TimedOutException&) { 188 } 189 190 if (state == SynchStartTask::STARTED) { 191 192 state = SynchStartTask::STOPPING; 193 194 monitor.notify(); 195 } 196 197 while (state == SynchStartTask::STOPPING) { 198 monitor.wait(); 199 } 200 } 201 202 assert(state == SynchStartTask::STOPPED); 203 204 bool success = true; 205 206 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl; 207 208 return true; 209 } 210 211 /** 212 * The only guarantee a monitor timeout can give you is that 213 * it will take "at least" as long as the timeout, no less. 214 * There is absolutely no guarantee around regaining execution 215 * near the timeout. On a busy system (like inside a third party 216 * CI environment) it could take quite a bit longer than the 217 * requested timeout, and that's ok. 218 */ 219 220 bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) { 221 222 Monitor monitor; 223 224 int64_t startTime = Util::currentTime(); 225 226 for (int64_t ix = 0; ix < count; ix++) { 227 { 228 Synchronized s(monitor); 229 try { 230 monitor.wait(timeout); catch(TimedOutException &)231 } catch (TimedOutException&) { 232 } 233 } 234 } 235 236 int64_t endTime = Util::currentTime(); 237 238 bool success = (endTime - startTime) >= (count * timeout); 239 240 std::cout << "\t\t\t" << (success ? "Success" : "Failure") 241 << ": minimum required time to elapse " << count * timeout 242 << "ms; actual elapsed time " << endTime - startTime << "ms" 243 << std::endl; 244 245 return success; 246 } 247 248 class FloodTask : public Runnable { 249 public: FloodTask(const size_t id,Monitor & mon)250 FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {} ~FloodTask()251 ~FloodTask() { 252 if (_id % 10000 == 0) { 253 Synchronized sync(_mon); 254 std::cout << "\t\tthread " << _id << " done" << std::endl; 255 } 256 } 257 run()258 void run() { 259 if (_id % 10000 == 0) { 260 Synchronized sync(_mon); 261 std::cout << "\t\tthread " << _id << " started" << std::endl; 262 } 263 } 264 const size_t _id; 265 Monitor& _mon; 266 }; 267 foo(PlatformThreadFactory * tf)268 void foo(PlatformThreadFactory* tf) { (void)tf; } 269 270 bool floodNTest(size_t loop = 1, size_t count = 100000) { 271 272 bool success = false; 273 Monitor mon; 274 275 for (size_t lix = 0; lix < loop; lix++) { 276 277 PlatformThreadFactory threadFactory = PlatformThreadFactory(); 278 threadFactory.setDetached(true); 279 280 for (size_t tix = 0; tix < count; tix++) { 281 282 try { 283 284 shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon)); 285 shared_ptr<Thread> thread = threadFactory.newThread(task); 286 thread->start(); 287 catch(TException & e)288 } catch (TException& e) { 289 290 std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() 291 << std::endl; 292 293 return success; 294 } 295 } 296 297 Synchronized sync(mon); 298 std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl; 299 success = true; 300 } 301 302 return success; 303 } 304 }; 305 306 } 307 } 308 } 309 } // apache::thrift::concurrency::test 310