1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
20 
21 #include <grpc/grpc.h>
22 
23 #include "src/core/lib/gprpp/thd.h"
24 #include "test/core/util/test_config.h"
25 
26 #define TEST_NUM_ITEMS 10000
27 
28 // Testing items for queue
29 struct WorkItem {
30   int index;
31   bool done;
32 
WorkItemWorkItem33   WorkItem(int i) : index(i) { done = false; }
34 };
35 
36 // Thread to "produce" items and put items into queue
37 // It will also check that all items has been marked done and clean up all
38 // produced items on destructing.
39 class ProducerThread {
40  public:
ProducerThread(grpc_core::InfLenFIFOQueue * queue,int start_index,int num_items)41   ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index,
42                  int num_items)
43       : start_index_(start_index), num_items_(num_items), queue_(queue) {
44     items_ = nullptr;
45     thd_ = grpc_core::Thread(
46         "mpmcq_test_producer_thd",
47         [](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
48   }
~ProducerThread()49   ~ProducerThread() {
50     for (int i = 0; i < num_items_; ++i) {
51       GPR_ASSERT(items_[i]->done);
52       grpc_core::Delete(items_[i]);
53     }
54     gpr_free(items_);
55   }
56 
Start()57   void Start() { thd_.Start(); }
Join()58   void Join() { thd_.Join(); }
59 
60  private:
Run()61   void Run() {
62     items_ =
63         static_cast<WorkItem**>(gpr_zalloc(num_items_ * sizeof(WorkItem*)));
64     for (int i = 0; i < num_items_; ++i) {
65       items_[i] = grpc_core::New<WorkItem>(start_index_ + i);
66       queue_->Put(items_[i]);
67     }
68   }
69 
70   int start_index_;
71   int num_items_;
72   grpc_core::InfLenFIFOQueue* queue_;
73   grpc_core::Thread thd_;
74   WorkItem** items_;
75 };
76 
77 // Thread to pull out items from queue
78 class ConsumerThread {
79  public:
ConsumerThread(grpc_core::InfLenFIFOQueue * queue)80   ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) {
81     thd_ = grpc_core::Thread(
82         "mpmcq_test_consumer_thd",
83         [](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
84   }
~ConsumerThread()85   ~ConsumerThread() {}
86 
Start()87   void Start() { thd_.Start(); }
Join()88   void Join() { thd_.Join(); }
89 
90  private:
Run()91   void Run() {
92     // count number of Get() called in this thread
93     int count = 0;
94 
95     WorkItem* item;
96     while ((item = static_cast<WorkItem*>(queue_->Get())) != nullptr) {
97       count++;
98       GPR_ASSERT(!item->done);
99       item->done = true;
100     }
101 
102     gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
103   }
104   grpc_core::InfLenFIFOQueue* queue_;
105   grpc_core::Thread thd_;
106 };
107 
test_FIFO(void)108 static void test_FIFO(void) {
109   gpr_log(GPR_INFO, "test_FIFO");
110   grpc_core::InfLenFIFOQueue large_queue;
111   for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
112     large_queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
113   }
114   GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
115   for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
116     WorkItem* item = static_cast<WorkItem*>(large_queue.Get());
117     GPR_ASSERT(i == item->index);
118     grpc_core::Delete(item);
119   }
120 }
121 
test_many_thread(void)122 static void test_many_thread(void) {
123   gpr_log(GPR_INFO, "test_many_thread");
124   const int num_producer_threads = 10;
125   const int num_consumer_threads = 20;
126   grpc_core::InfLenFIFOQueue queue;
127   ProducerThread** producer_threads = static_cast<ProducerThread**>(
128       gpr_zalloc(num_producer_threads * sizeof(ProducerThread*)));
129   ConsumerThread** consumer_threads = static_cast<ConsumerThread**>(
130       gpr_zalloc(num_consumer_threads * sizeof(ConsumerThread*)));
131 
132   gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
133   for (int i = 0; i < num_producer_threads; ++i) {
134     producer_threads[i] = grpc_core::New<ProducerThread>(
135         &queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS);
136     producer_threads[i]->Start();
137   }
138   gpr_log(GPR_DEBUG, "ProducerThreads Started.");
139   gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
140   for (int i = 0; i < num_consumer_threads; ++i) {
141     consumer_threads[i] = grpc_core::New<ConsumerThread>(&queue);
142     consumer_threads[i]->Start();
143   }
144   gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
145   gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
146   for (int i = 0; i < num_producer_threads; ++i) {
147     producer_threads[i]->Join();
148   }
149   gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
150   gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
151   for (int i = 0; i < num_consumer_threads; ++i) {
152     queue.Put(nullptr);
153   }
154   for (int i = 0; i < num_consumer_threads; ++i) {
155     consumer_threads[i]->Join();
156   }
157   gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
158   gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
159   for (int i = 0; i < num_producer_threads; ++i) {
160     // Destructor of ProducerThread will do the check of WorkItems
161     grpc_core::Delete(producer_threads[i]);
162   }
163   gpr_free(producer_threads);
164   for (int i = 0; i < num_consumer_threads; ++i) {
165     grpc_core::Delete(consumer_threads[i]);
166   }
167   gpr_free(consumer_threads);
168   gpr_log(GPR_DEBUG, "Done.");
169 }
170 
main(int argc,char ** argv)171 int main(int argc, char** argv) {
172   grpc::testing::TestEnvironment env(argc, argv);
173   grpc_init();
174   test_FIFO();
175   test_many_thread();
176   grpc_shutdown();
177   return 0;
178 }
179