1 #ifdef RDK_THREADSAFE_SSS
2 #include <RDGeneral/Invariant.h>
3 #include <RDGeneral/RDLog.h>
4
5 #include <functional>
6 #include <iomanip>
7 #include <sstream>
8
9 #include "ConcurrentQueue.h"
10
11 using namespace RDKit;
12
13 //! method for testing basic ConcurrentQueue operations
testPushAndPop()14 void testPushAndPop() {
15 ConcurrentQueue<int>* q = new ConcurrentQueue<int>(4);
16 int e1, e2, e3;
17 TEST_ASSERT(q->isEmpty());
18
19 q->push(1);
20 q->push(2);
21 q->push(3);
22
23 TEST_ASSERT(!q->isEmpty());
24
25 TEST_ASSERT(q->pop(e1));
26 TEST_ASSERT(q->pop(e2));
27 TEST_ASSERT(q->pop(e3));
28
29 TEST_ASSERT(e1 == 1);
30 TEST_ASSERT(e2 == 2);
31 TEST_ASSERT(e3 == 3);
32
33 TEST_ASSERT(q->isEmpty());
34
35 delete (q);
36 }
37
produce(ConcurrentQueue<int> & q,const int numToProduce)38 void produce(ConcurrentQueue<int>& q, const int numToProduce) {
39 for (int i = 0; i < numToProduce; ++i) {
40 q.push(i);
41 }
42 }
43
consume(ConcurrentQueue<int> & q,std::vector<int> & result)44 void consume(ConcurrentQueue<int>& q, std::vector<int>& result) {
45 int element;
46 while (q.pop(element)) {
47 result.push_back(element);
48 }
49 }
50
51 //! multithreaded testing for ConcurrentQueue
testProducerConsumer(const int numProducerThreads,const int numConsumerThreads)52 bool testProducerConsumer(const int numProducerThreads,
53 const int numConsumerThreads) {
54 ConcurrentQueue<int> q(5);
55 TEST_ASSERT(q.isEmpty());
56
57 const int numToProduce = 10;
58
59 std::vector<std::thread> producers(numProducerThreads);
60 std::vector<std::thread> consumers(numConsumerThreads);
61 std::vector<std::vector<int>> results(numConsumerThreads);
62
63 //! start producer threads
64 for (int i = 0; i < numProducerThreads; i++) {
65 producers[i] = std::thread(produce, std::ref(q), numToProduce);
66 }
67 //! start consumer threads
68 for (int i = 0; i < numConsumerThreads; i++) {
69 consumers[i] = std::thread(consume, std::ref(q), std::ref(results[i]));
70 }
71
72 std::for_each(producers.begin(), producers.end(),
73 std::mem_fn(&std::thread::join));
74
75 //! the producer is done producing
76 q.setDone();
77
78 std::for_each(consumers.begin(), consumers.end(),
79 std::mem_fn(&std::thread::join));
80 TEST_ASSERT(q.isEmpty());
81
82 std::vector<int> frequency(numToProduce, 0);
83 for (auto& result : results) {
84 for (auto& element : result) {
85 frequency[element] += 1;
86 }
87 }
88 for (auto& freq : frequency) {
89 if (freq != numProducerThreads) {
90 return false;
91 }
92 }
93 return true;
94 }
95
testMultipleTimes()96 void testMultipleTimes() {
97 const int trials = 10000;
98 //! Single Producer, Single Consumer
99 for (int i = 0; i < trials; i++) {
100 bool result = testProducerConsumer(1, 1);
101 TEST_ASSERT(result);
102 }
103
104 //! Single Producer, Multiple Consumer
105 for (int i = 0; i < trials; i++) {
106 bool result = testProducerConsumer(1, 5);
107 TEST_ASSERT(result);
108 }
109
110 //! Multiple Producer, Single Consumer
111 for (int i = 0; i < trials; i++) {
112 bool result = testProducerConsumer(5, 1);
113 TEST_ASSERT(result);
114 }
115
116 //! Multiple Producer, Multiple Consumer
117 for (int i = 0; i < trials; i++) {
118 bool result = testProducerConsumer(2, 4);
119 TEST_ASSERT(result);
120 }
121 }
122
main()123 int main() {
124 RDLog::InitLogs();
125
126 BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
127 testPushAndPop();
128 BOOST_LOG(rdErrorLog) << "Finished: testPushAndPop() \n";
129 BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
130 #ifdef RDK_TEST_MULTITHREADED
131 BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
132 testMultipleTimes();
133 BOOST_LOG(rdErrorLog) << "Finished: testMultipleTimes() \n";
134 BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
135 #endif
136 return 0;
137 }
138
139 #endif
140