1 #ifdef RDK_THREADSAFE_SSS
2 #include <RDGeneral/Invariant.h>
3 #include <RDGeneral/RDLog.h>
4 
5 #include <functional>
6 #include <iomanip>
7 #include <sstream>
8 
9 #include "ConcurrentQueue.h"
10 
11 using namespace RDKit;
12 
13 //! method for testing basic ConcurrentQueue operations
testPushAndPop()14 void testPushAndPop() {
15   ConcurrentQueue<int>* q = new ConcurrentQueue<int>(4);
16   int e1, e2, e3;
17   TEST_ASSERT(q->isEmpty());
18 
19   q->push(1);
20   q->push(2);
21   q->push(3);
22 
23   TEST_ASSERT(!q->isEmpty());
24 
25   TEST_ASSERT(q->pop(e1));
26   TEST_ASSERT(q->pop(e2));
27   TEST_ASSERT(q->pop(e3));
28 
29   TEST_ASSERT(e1 == 1);
30   TEST_ASSERT(e2 == 2);
31   TEST_ASSERT(e3 == 3);
32 
33   TEST_ASSERT(q->isEmpty());
34 
35   delete (q);
36 }
37 
produce(ConcurrentQueue<int> & q,const int numToProduce)38 void produce(ConcurrentQueue<int>& q, const int numToProduce) {
39   for (int i = 0; i < numToProduce; ++i) {
40     q.push(i);
41   }
42 }
43 
consume(ConcurrentQueue<int> & q,std::vector<int> & result)44 void consume(ConcurrentQueue<int>& q, std::vector<int>& result) {
45   int element;
46   while (q.pop(element)) {
47     result.push_back(element);
48   }
49 }
50 
51 //! multithreaded testing for ConcurrentQueue
testProducerConsumer(const int numProducerThreads,const int numConsumerThreads)52 bool testProducerConsumer(const int numProducerThreads,
53                           const int numConsumerThreads) {
54   ConcurrentQueue<int> q(5);
55   TEST_ASSERT(q.isEmpty());
56 
57   const int numToProduce = 10;
58 
59   std::vector<std::thread> producers(numProducerThreads);
60   std::vector<std::thread> consumers(numConsumerThreads);
61   std::vector<std::vector<int>> results(numConsumerThreads);
62 
63   //! start producer threads
64   for (int i = 0; i < numProducerThreads; i++) {
65     producers[i] = std::thread(produce, std::ref(q), numToProduce);
66   }
67   //! start consumer threads
68   for (int i = 0; i < numConsumerThreads; i++) {
69     consumers[i] = std::thread(consume, std::ref(q), std::ref(results[i]));
70   }
71 
72   std::for_each(producers.begin(), producers.end(),
73                 std::mem_fn(&std::thread::join));
74 
75   //! the producer is done producing
76   q.setDone();
77 
78   std::for_each(consumers.begin(), consumers.end(),
79                 std::mem_fn(&std::thread::join));
80   TEST_ASSERT(q.isEmpty());
81 
82   std::vector<int> frequency(numToProduce, 0);
83   for (auto& result : results) {
84     for (auto& element : result) {
85       frequency[element] += 1;
86     }
87   }
88   for (auto& freq : frequency) {
89     if (freq != numProducerThreads) {
90       return false;
91     }
92   }
93   return true;
94 }
95 
testMultipleTimes()96 void testMultipleTimes() {
97   const int trials = 10000;
98   //! Single Producer, Single Consumer
99   for (int i = 0; i < trials; i++) {
100     bool result = testProducerConsumer(1, 1);
101     TEST_ASSERT(result);
102   }
103 
104   //! Single Producer, Multiple Consumer
105   for (int i = 0; i < trials; i++) {
106     bool result = testProducerConsumer(1, 5);
107     TEST_ASSERT(result);
108   }
109 
110   //! Multiple Producer, Single Consumer
111   for (int i = 0; i < trials; i++) {
112     bool result = testProducerConsumer(5, 1);
113     TEST_ASSERT(result);
114   }
115 
116   //! Multiple Producer, Multiple Consumer
117   for (int i = 0; i < trials; i++) {
118     bool result = testProducerConsumer(2, 4);
119     TEST_ASSERT(result);
120   }
121 }
122 
main()123 int main() {
124   RDLog::InitLogs();
125 
126   BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
127   testPushAndPop();
128   BOOST_LOG(rdErrorLog) << "Finished: testPushAndPop() \n";
129   BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
130 #ifdef RDK_TEST_MULTITHREADED
131   BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
132   testMultipleTimes();
133   BOOST_LOG(rdErrorLog) << "Finished: testMultipleTimes() \n";
134   BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
135 #endif
136   return 0;
137 }
138 
139 #endif
140