1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 #include <thrift/concurrency/ThreadManager.h>
22 #include <thrift/concurrency/ThreadFactory.h>
23 #include <thrift/concurrency/Monitor.h>
24 
25 #include <assert.h>
26 #include <deque>
27 #include <set>
28 #include <iostream>
29 #include <stdint.h>
30 
31 namespace apache {
32 namespace thrift {
33 namespace concurrency {
34 namespace test {
35 
36 using namespace apache::thrift::concurrency;
37 
38 static std::deque<std::shared_ptr<Runnable> > m_expired;
expiredNotifier(std::shared_ptr<Runnable> runnable)39 static void expiredNotifier(std::shared_ptr<Runnable> runnable)
40 {
41   m_expired.push_back(runnable);
42 }
43 
sleep_(int64_t millisec)44 static void sleep_(int64_t millisec) {
45   Monitor _sleep;
46   Synchronized s(_sleep);
47 
48   try {
49     _sleep.wait(millisec);
50   } catch (TimedOutException&) {
51     ;
52   } catch (...) {
53     assert(0);
54   }
55 }
56 
57 class ThreadManagerTests {
58 
59 public:
60   class Task : public Runnable {
61 
62   public:
Task(Monitor & monitor,size_t & count,int64_t timeout)63     Task(Monitor& monitor, size_t& count, int64_t timeout)
64       : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
65 
run()66     void run() override {
67 
68       _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
69 
70       sleep_(_timeout);
71 
72       _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
73 
74       _done = true;
75 
76       {
77         Synchronized s(_monitor);
78 
79         // std::cout << "Thread " << _count << " completed " << std::endl;
80 
81         _count--;
82         if (_count % 10000 == 0) {
83           _monitor.notify();
84         }
85       }
86     }
87 
88     Monitor& _monitor;
89     size_t& _count;
90     int64_t _timeout;
91     int64_t _startTime;
92     int64_t _endTime;
93     bool _done;
94     Monitor _sleep;
95   };
96 
97   /**
98    * Dispatch count tasks, each of which blocks for timeout milliseconds then
99    * completes. Verify that all tasks completed and that thread manager cleans
100    * up properly on delete.
101    */
102   bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
103 
104     Monitor monitor;
105 
106     size_t activeCount = count;
107 
108     shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
109 
110     shared_ptr<ThreadFactory> threadFactory
111         = shared_ptr<ThreadFactory>(new ThreadFactory(false));
112 
113     threadManager->threadFactory(threadFactory);
114 
115     threadManager->start();
116 
117     std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
118 
119     for (size_t ix = 0; ix < count; ix++) {
120 
121       tasks.insert(shared_ptr<ThreadManagerTests::Task>(
122           new ThreadManagerTests::Task(monitor, activeCount, timeout)));
123     }
124 
125     int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
126 
127     for (auto ix = tasks.begin();
128          ix != tasks.end();
129          ix++) {
130 
131       threadManager->add(*ix);
132     }
133 
134     std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
135 
136     {
137       Synchronized s(monitor);
138 
139       while (activeCount > 0) {
140         std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
141         monitor.wait();
142       }
143     }
144 
145     int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
146 
147     int64_t firstTime = 9223372036854775807LL;
148     int64_t lastTime = 0;
149 
150     double averageTime = 0;
151     int64_t minTime = 9223372036854775807LL;
152     int64_t maxTime = 0;
153 
154     for (auto ix = tasks.begin();
155          ix != tasks.end();
156          ix++) {
157 
158       shared_ptr<ThreadManagerTests::Task> task = *ix;
159 
160       int64_t delta = task->_endTime - task->_startTime;
161 
162       assert(delta > 0);
163 
164       if (task->_startTime < firstTime) {
165         firstTime = task->_startTime;
166       }
167 
168       if (task->_endTime > lastTime) {
169         lastTime = task->_endTime;
170       }
171 
172       if (delta < minTime) {
173         minTime = delta;
174       }
175 
176       if (delta > maxTime) {
177         maxTime = delta;
178       }
179 
180       averageTime += delta;
181     }
182 
183     averageTime /= count;
184 
185     std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
186               << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
187               << "ms" << std::endl;
188 
189     bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
190 
191     std::cout << "\t\t\t" << (success ? "Success" : "Failure")
192               << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
193               << "ms" << std::endl;
194 
195     return success;
196   }
197 
198   class BlockTask : public Runnable {
199 
200   public:
BlockTask(Monitor & entryMonitor,Monitor & blockMonitor,bool & blocked,Monitor & doneMonitor,size_t & count)201     BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
202       : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
203 
run()204     void run() override {
205       {
206         Synchronized s(_entryMonitor);
207         _entered = true;
208         _entryMonitor.notify();
209       }
210 
211       {
212         Synchronized s(_blockMonitor);
213         while (_blocked) {
214           _blockMonitor.wait();
215         }
216       }
217 
218       {
219         Synchronized s(_doneMonitor);
220         if (--_count == 0) {
221           _doneMonitor.notify();
222         }
223       }
224     }
225 
226     Monitor& _entryMonitor;
227     bool _entered;
228     Monitor& _blockMonitor;
229     bool& _blocked;
230     Monitor& _doneMonitor;
231     size_t& _count;
232   };
233 
234   /**
235    * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
236    * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
237 
238   bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
239     (void)timeout;
240     bool success = false;
241 
242     try {
243 
244       Monitor entryMonitor;   // not used by this test
245       Monitor blockMonitor;
246       bool blocked[] = {true, true, true};
247       Monitor doneMonitor;
248 
249       size_t pendingTaskMaxCount = workerCount;
250 
251       size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
252 
253       shared_ptr<ThreadManager> threadManager
254           = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
255 
256       shared_ptr<ThreadFactory> threadFactory
257           = shared_ptr<ThreadFactory>(new ThreadFactory());
258 
259       threadManager->threadFactory(threadFactory);
260 
261       threadManager->start();
262 
263       std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
264       tasks.reserve(workerCount + pendingTaskMaxCount);
265 
266       for (size_t ix = 0; ix < workerCount; ix++) {
267 
268         tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
269             new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
270       }
271 
272       for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
273 
274         tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
275             new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
276       }
277 
278       for (auto ix = tasks.begin();
279            ix != tasks.end();
280            ix++) {
281         threadManager->add(*ix);
282       }
283 
284       if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
285         throw TException("Unexpected pending task count");
286       }
287 
288       shared_ptr<ThreadManagerTests::BlockTask> extraTask(
289           new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
290 
291       try {
292         threadManager->add(extraTask, 1);
293         throw TException("Unexpected success adding task in excess of pending task count");
catch(TooManyPendingTasksException &)294       } catch (TooManyPendingTasksException&) {
295         throw TException("Should have timed out adding task in excess of pending task count");
296       } catch (TimedOutException&) {
297         // Expected result
298       }
299 
300       try {
301         threadManager->add(extraTask, -1);
302         throw TException("Unexpected success adding task in excess of pending task count");
catch(TimedOutException &)303       } catch (TimedOutException&) {
304         throw TException("Unexpected timeout adding task in excess of pending task count");
305       } catch (TooManyPendingTasksException&) {
306         // Expected result
307       }
308 
309       std::cout << "\t\t\t"
310                 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
311 
312       {
313         Synchronized s(blockMonitor);
314         blocked[0] = false;
315         blockMonitor.notifyAll();
316       }
317 
318       {
319         Synchronized s(doneMonitor);
320         while (activeCounts[0] != 0) {
321           doneMonitor.wait();
322         }
323       }
324 
325       std::cout << "\t\t\t"
326                 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
327 
328       try {
329         threadManager->add(extraTask, 1);
catch(TimedOutException &)330       } catch (TimedOutException&) {
331         std::cout << "\t\t\t"
332                   << "add timed out unexpectedly" << std::endl;
333         throw TException("Unexpected timeout adding task");
334 
335       } catch (TooManyPendingTasksException&) {
336         std::cout << "\t\t\t"
337                   << "add encountered too many pending exepctions" << std::endl;
338         throw TException("Unexpected timeout adding task");
339       }
340 
341       // Wake up tasks that were pending before and wait for them to complete
342 
343       {
344         Synchronized s(blockMonitor);
345         blocked[1] = false;
346         blockMonitor.notifyAll();
347       }
348 
349       {
350         Synchronized s(doneMonitor);
351         while (activeCounts[1] != 0) {
352           doneMonitor.wait();
353         }
354       }
355 
356       // Wake up the extra task and wait for it to complete
357 
358       {
359         Synchronized s(blockMonitor);
360         blocked[2] = false;
361         blockMonitor.notifyAll();
362       }
363 
364       {
365         Synchronized s(doneMonitor);
366         while (activeCounts[2] != 0) {
367           doneMonitor.wait();
368         }
369       }
370 
371       threadManager->stop();
372 
373       if (!(success = (threadManager->totalTaskCount() == 0))) {
374         throw TException("Unexpected total task count");
375       }
376 
catch(TException & e)377     } catch (TException& e) {
378       std::cout << "ERROR: " << e.what() << std::endl;
379     }
380 
381     std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
382     return success;
383   }
384 
385 
apiTest()386   bool apiTest() {
387 
388     // prove currentTime has milliseconds granularity since many other things depend on it
389     int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
390     sleep_(100);
391     int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
392     if (b - a < 50 || b - a > 150) {
393       std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
394       return false;
395     }
396 
397     return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
398 
399   }
400 
apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)401   bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
402   {
403     shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
404     threadManager->threadFactory(threadFactory);
405 
406     std::cout << "\t\t\t\tstarting.. " << std::endl;
407 
408     threadManager->start();
409     threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
410 
411 #define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
412 
413     EXPECT(threadManager->workerCount(), 1);
414     EXPECT(threadManager->idleWorkerCount(), 1);
415     EXPECT(threadManager->pendingTaskCount(), 0);
416 
417     std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
418 
419     threadManager->addWorker();
420 
421     EXPECT(threadManager->workerCount(), 2);
422     EXPECT(threadManager->idleWorkerCount(), 2);
423     EXPECT(threadManager->pendingTaskCount(), 0);
424 
425     std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
426 
427     threadManager->removeWorker();
428 
429     EXPECT(threadManager->workerCount(), 1);
430     EXPECT(threadManager->idleWorkerCount(), 1);
431     EXPECT(threadManager->pendingTaskCount(), 0);
432 
433     std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
434 
435     threadManager->removeWorker();
436 
437     EXPECT(threadManager->workerCount(), 0);
438     EXPECT(threadManager->idleWorkerCount(), 0);
439     EXPECT(threadManager->pendingTaskCount(), 0);
440 
441     std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
442 
443     // We're going to throw a blocking task into the mix
444     Monitor entryMonitor;   // signaled when task is running
445     Monitor blockMonitor;   // to be signaled to unblock the task
446     bool blocked(true);     // set to false before notifying
447     Monitor doneMonitor;    // signaled when count reaches zero
448     size_t activeCount = 1;
449     shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
450       new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
451     threadManager->add(blockingTask);
452 
453     EXPECT(threadManager->workerCount(), 0);
454     EXPECT(threadManager->idleWorkerCount(), 0);
455     EXPECT(threadManager->pendingTaskCount(), 1);
456 
457     std::cout << "\t\t\t\tadd other task.. " << std::endl;
458 
459     shared_ptr<ThreadManagerTests::Task> otherTask(
460       new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
461 
462     threadManager->add(otherTask);
463 
464     EXPECT(threadManager->workerCount(), 0);
465     EXPECT(threadManager->idleWorkerCount(), 0);
466     EXPECT(threadManager->pendingTaskCount(), 2);
467 
468     std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
469 
470     threadManager->remove(blockingTask);
471 
472     EXPECT(threadManager->workerCount(), 0);
473     EXPECT(threadManager->idleWorkerCount(), 0);
474     EXPECT(threadManager->pendingTaskCount(), 1);
475 
476     std::cout << "\t\t\t\tremove next pending task.." << std::endl;
477 
478     shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
479     if (nextTask != otherTask) {
480       std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
481       return false;
482     }
483 
484     EXPECT(threadManager->workerCount(), 0);
485     EXPECT(threadManager->idleWorkerCount(), 0);
486     EXPECT(threadManager->pendingTaskCount(), 0);
487 
488     std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
489 
490     nextTask = threadManager->removeNextPending();
491     if (nextTask) {
492       std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
493       return false;
494     }
495 
496     std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
497 
498     shared_ptr<ThreadManagerTests::Task> expiredTask(
499       new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
500 
501     threadManager->add(expiredTask, 0, 1);
502     threadManager->add(blockingTask);       // add one that hasn't expired to make sure it gets skipped
503     threadManager->add(expiredTask, 0, 1);  // add a second expired to ensure removeExpiredTasks removes both
504 
505     sleep_(50);  // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
506 
507     EXPECT(threadManager->workerCount(), 0);
508     EXPECT(threadManager->idleWorkerCount(), 0);
509     EXPECT(threadManager->pendingTaskCount(), 3);
510     EXPECT(threadManager->expiredTaskCount(), 0);
511 
512     std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
513 
514     if (!m_expired.empty()) {
515       std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
516       return false;
517     }
518 
519     threadManager->removeExpiredTasks();
520 
521     if (m_expired.size() != 2) {
522       std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
523       return false;
524     }
525 
526     if (m_expired.front() != expiredTask) {
527       std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
528       return false;
529     }
530     m_expired.pop_front();
531 
532     if (m_expired.front() != expiredTask) {
533       std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
534       return false;
535     }
536 
537     m_expired.clear();
538 
539     threadManager->remove(blockingTask);
540 
541     EXPECT(threadManager->workerCount(), 0);
542     EXPECT(threadManager->idleWorkerCount(), 0);
543     EXPECT(threadManager->pendingTaskCount(), 0);
544     EXPECT(threadManager->expiredTaskCount(), 2);
545 
546     std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
547 
548     threadManager->add(expiredTask, 0, 1);  // expires in 1ms
549     sleep_(50);  // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
550 
551     std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
552 
553     threadManager->addWorker();
554     sleep_(100);  // make sure it has time to spin up and expire the task
555 
556     if (m_expired.empty()) {
557       std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
558       return false;
559     }
560 
561     if (m_expired.front() != expiredTask) {
562       std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
563       return false;
564     }
565 
566     m_expired.clear();
567 
568     EXPECT(threadManager->workerCount(), 1);
569     EXPECT(threadManager->idleWorkerCount(), 1);
570     EXPECT(threadManager->pendingTaskCount(), 0);
571     EXPECT(threadManager->expiredTaskCount(), 3);
572 
573     std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
574     try {
575       threadManager->removeWorker(2);
576       std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
577       return false;
578     } catch (const InvalidArgumentException&) {
579       /* expected */
580     }
581 
582     std::cout << "\t\t\t\tremove worker.. " << std::endl;
583 
584     threadManager->removeWorker();
585 
586     EXPECT(threadManager->workerCount(), 0);
587     EXPECT(threadManager->idleWorkerCount(), 0);
588     EXPECT(threadManager->pendingTaskCount(), 0);
589     EXPECT(threadManager->expiredTaskCount(), 3);
590 
591     std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
592 
593     threadManager->add(blockingTask);
594 
595     EXPECT(threadManager->workerCount(), 0);
596     EXPECT(threadManager->idleWorkerCount(), 0);
597     EXPECT(threadManager->pendingTaskCount(), 1);
598 
599     std::cout << "\t\t\t\tadd worker.. " << std::endl;
600 
601     threadManager->addWorker();
602     {
603       Synchronized s(entryMonitor);
604       while (!blockingTask->_entered) {
605         entryMonitor.wait();
606       }
607     }
608 
609     EXPECT(threadManager->workerCount(), 1);
610     EXPECT(threadManager->idleWorkerCount(), 0);
611     EXPECT(threadManager->pendingTaskCount(), 0);
612 
613     std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
614 
615     {
616       Synchronized s(blockMonitor);
617       blocked = false;
618       blockMonitor.notifyAll();
619     }
620     threadManager->removeWorker();
621 
622     EXPECT(threadManager->workerCount(), 0);
623     EXPECT(threadManager->idleWorkerCount(), 0);
624     EXPECT(threadManager->pendingTaskCount(), 0);
625 
626     std::cout << "\t\t\t\tcleanup.. " << std::endl;
627 
628     blockingTask.reset();
629     threadManager.reset();
630     return true;
631   }
632 };
633 
634 }
635 }
636 }
637 } // apache::thrift::concurrency
638 
639 using namespace apache::thrift::concurrency::test;
640