1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
20 
21 #include <grpc/grpc.h>
22 
23 #include "src/core/lib/gprpp/thd.h"
24 #include "test/core/util/test_config.h"
25 
26 #define TEST_NUM_ITEMS 10000
27 
28 // Testing items for queue
29 struct WorkItem {
30   int index;
31   bool done;
32 
WorkItemWorkItem33   WorkItem(int i) : index(i) { done = false; }
34 };
35 
36 // Thread to "produce" items and put items into queue
37 // It will also check that all items has been marked done and clean up all
38 // produced items on destructing.
39 class ProducerThread {
40  public:
ProducerThread(grpc_core::InfLenFIFOQueue * queue,int start_index,int num_items)41   ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index,
42                  int num_items)
43       : start_index_(start_index), num_items_(num_items), queue_(queue) {
44     items_ = nullptr;
45     thd_ = grpc_core::Thread(
46         "mpmcq_test_producer_thd",
47         [](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
48   }
~ProducerThread()49   ~ProducerThread() {
50     for (int i = 0; i < num_items_; ++i) {
51       GPR_ASSERT(items_[i]->done);
52       delete items_[i];
53     }
54     gpr_free(items_);
55   }
56 
Start()57   void Start() { thd_.Start(); }
Join()58   void Join() { thd_.Join(); }
59 
60  private:
Run()61   void Run() {
62     items_ =
63         static_cast<WorkItem**>(gpr_zalloc(num_items_ * sizeof(WorkItem*)));
64     for (int i = 0; i < num_items_; ++i) {
65       items_[i] = new WorkItem(start_index_ + i);
66       queue_->Put(items_[i]);
67     }
68   }
69 
70   int start_index_;
71   int num_items_;
72   grpc_core::InfLenFIFOQueue* queue_;
73   grpc_core::Thread thd_;
74   WorkItem** items_;
75 };
76 
77 // Thread to pull out items from queue
78 class ConsumerThread {
79  public:
ConsumerThread(grpc_core::InfLenFIFOQueue * queue)80   ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) {
81     thd_ = grpc_core::Thread(
82         "mpmcq_test_consumer_thd",
83         [](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
84   }
~ConsumerThread()85   ~ConsumerThread() {}
86 
Start()87   void Start() { thd_.Start(); }
Join()88   void Join() { thd_.Join(); }
89 
90  private:
Run()91   void Run() {
92     // count number of Get() called in this thread
93     int count = 0;
94 
95     WorkItem* item;
96     while ((item = static_cast<WorkItem*>(queue_->Get())) != nullptr) {
97       count++;
98       GPR_ASSERT(!item->done);
99       item->done = true;
100     }
101 
102     gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
103   }
104   grpc_core::InfLenFIFOQueue* queue_;
105   grpc_core::Thread thd_;
106 };
107 
test_FIFO(void)108 static void test_FIFO(void) {
109   gpr_log(GPR_INFO, "test_FIFO");
110   grpc_core::InfLenFIFOQueue large_queue;
111   for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
112     large_queue.Put(static_cast<void*>(new WorkItem(i)));
113   }
114   GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
115   for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
116     WorkItem* item = static_cast<WorkItem*>(large_queue.Get());
117     GPR_ASSERT(i == item->index);
118     delete item;
119   }
120 }
121 
122 // Test if queue's behavior of expanding is correct. (Only does expansion when
123 // it gets full, and each time expands to doubled size).
test_space_efficiency(void)124 static void test_space_efficiency(void) {
125   gpr_log(GPR_INFO, "test_space_efficiency");
126   grpc_core::InfLenFIFOQueue queue;
127   for (int i = 0; i < queue.init_num_nodes(); ++i) {
128     queue.Put(static_cast<void*>(new WorkItem(i)));
129   }
130   // Queue should not have been expanded at this time.
131   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
132   for (int i = 0; i < queue.init_num_nodes(); ++i) {
133     WorkItem* item = static_cast<WorkItem*>(queue.Get());
134     queue.Put(item);
135   }
136   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
137   for (int i = 0; i < queue.init_num_nodes(); ++i) {
138     WorkItem* item = static_cast<WorkItem*>(queue.Get());
139     delete item;
140   }
141   // Queue never shrinks even it is empty.
142   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
143   GPR_ASSERT(queue.count() == 0);
144   // queue empty now
145   for (int i = 0; i < queue.init_num_nodes() * 2; ++i) {
146     queue.Put(static_cast<void*>(new WorkItem(i)));
147   }
148   GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2);
149   // Queue should have been expanded once.
150   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
151   for (int i = 0; i < queue.init_num_nodes(); ++i) {
152     WorkItem* item = static_cast<WorkItem*>(queue.Get());
153     delete item;
154   }
155   GPR_ASSERT(queue.count() == queue.init_num_nodes());
156   // Queue will never shrink, should keep same number of node as before.
157   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
158   for (int i = 0; i < queue.init_num_nodes() + 1; ++i) {
159     queue.Put(static_cast<void*>(new WorkItem(i)));
160   }
161   GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1);
162   // Queue should have been expanded twice.
163   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
164   for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) {
165     WorkItem* item = static_cast<WorkItem*>(queue.Get());
166     delete item;
167   }
168   GPR_ASSERT(queue.count() == 0);
169   GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
170   gpr_log(GPR_DEBUG, "Done.");
171 }
172 
test_many_thread(void)173 static void test_many_thread(void) {
174   gpr_log(GPR_INFO, "test_many_thread");
175   const int num_producer_threads = 10;
176   const int num_consumer_threads = 20;
177   grpc_core::InfLenFIFOQueue queue;
178   ProducerThread** producer_threads = static_cast<ProducerThread**>(
179       gpr_zalloc(num_producer_threads * sizeof(ProducerThread*)));
180   ConsumerThread** consumer_threads = static_cast<ConsumerThread**>(
181       gpr_zalloc(num_consumer_threads * sizeof(ConsumerThread*)));
182 
183   gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
184   for (int i = 0; i < num_producer_threads; ++i) {
185     producer_threads[i] =
186         new ProducerThread(&queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS);
187     producer_threads[i]->Start();
188   }
189   gpr_log(GPR_DEBUG, "ProducerThreads Started.");
190   gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
191   for (int i = 0; i < num_consumer_threads; ++i) {
192     consumer_threads[i] = new ConsumerThread(&queue);
193     consumer_threads[i]->Start();
194   }
195   gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
196   gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
197   for (int i = 0; i < num_producer_threads; ++i) {
198     producer_threads[i]->Join();
199   }
200   gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
201   gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
202   for (int i = 0; i < num_consumer_threads; ++i) {
203     queue.Put(nullptr);
204   }
205   for (int i = 0; i < num_consumer_threads; ++i) {
206     consumer_threads[i]->Join();
207   }
208   gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
209   gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
210   for (int i = 0; i < num_producer_threads; ++i) {
211     // Destructor of ProducerThread will do the check of WorkItems
212     delete producer_threads[i];
213   }
214   gpr_free(producer_threads);
215   for (int i = 0; i < num_consumer_threads; ++i) {
216     delete consumer_threads[i];
217   }
218   gpr_free(consumer_threads);
219   gpr_log(GPR_DEBUG, "Done.");
220 }
221 
main(int argc,char ** argv)222 int main(int argc, char** argv) {
223   grpc::testing::TestEnvironment env(argc, argv);
224   grpc_init();
225   test_FIFO();
226   test_space_efficiency();
227   test_many_thread();
228   grpc_shutdown();
229   return 0;
230 }
231