1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
20
21 #include <grpc/grpc.h>
22
23 #include "src/core/lib/gprpp/thd.h"
24 #include "test/core/util/test_config.h"
25
26 #define TEST_NUM_ITEMS 10000
27
28 // Testing items for queue
29 struct WorkItem {
30 int index;
31 bool done;
32
WorkItemWorkItem33 WorkItem(int i) : index(i) { done = false; }
34 };
35
36 // Thread to "produce" items and put items into queue
37 // It will also check that all items has been marked done and clean up all
38 // produced items on destructing.
39 class ProducerThread {
40 public:
ProducerThread(grpc_core::InfLenFIFOQueue * queue,int start_index,int num_items)41 ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index,
42 int num_items)
43 : start_index_(start_index), num_items_(num_items), queue_(queue) {
44 items_ = nullptr;
45 thd_ = grpc_core::Thread(
46 "mpmcq_test_producer_thd",
47 [](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
48 }
~ProducerThread()49 ~ProducerThread() {
50 for (int i = 0; i < num_items_; ++i) {
51 GPR_ASSERT(items_[i]->done);
52 grpc_core::Delete(items_[i]);
53 }
54 gpr_free(items_);
55 }
56
Start()57 void Start() { thd_.Start(); }
Join()58 void Join() { thd_.Join(); }
59
60 private:
Run()61 void Run() {
62 items_ =
63 static_cast<WorkItem**>(gpr_zalloc(num_items_ * sizeof(WorkItem*)));
64 for (int i = 0; i < num_items_; ++i) {
65 items_[i] = grpc_core::New<WorkItem>(start_index_ + i);
66 queue_->Put(items_[i]);
67 }
68 }
69
70 int start_index_;
71 int num_items_;
72 grpc_core::InfLenFIFOQueue* queue_;
73 grpc_core::Thread thd_;
74 WorkItem** items_;
75 };
76
77 // Thread to pull out items from queue
78 class ConsumerThread {
79 public:
ConsumerThread(grpc_core::InfLenFIFOQueue * queue)80 ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) {
81 thd_ = grpc_core::Thread(
82 "mpmcq_test_consumer_thd",
83 [](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
84 }
~ConsumerThread()85 ~ConsumerThread() {}
86
Start()87 void Start() { thd_.Start(); }
Join()88 void Join() { thd_.Join(); }
89
90 private:
Run()91 void Run() {
92 // count number of Get() called in this thread
93 int count = 0;
94
95 WorkItem* item;
96 while ((item = static_cast<WorkItem*>(queue_->Get())) != nullptr) {
97 count++;
98 GPR_ASSERT(!item->done);
99 item->done = true;
100 }
101
102 gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
103 }
104 grpc_core::InfLenFIFOQueue* queue_;
105 grpc_core::Thread thd_;
106 };
107
test_FIFO(void)108 static void test_FIFO(void) {
109 gpr_log(GPR_INFO, "test_FIFO");
110 grpc_core::InfLenFIFOQueue large_queue;
111 for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
112 large_queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
113 }
114 GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
115 for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
116 WorkItem* item = static_cast<WorkItem*>(large_queue.Get());
117 GPR_ASSERT(i == item->index);
118 grpc_core::Delete(item);
119 }
120 }
121
test_many_thread(void)122 static void test_many_thread(void) {
123 gpr_log(GPR_INFO, "test_many_thread");
124 const int num_producer_threads = 10;
125 const int num_consumer_threads = 20;
126 grpc_core::InfLenFIFOQueue queue;
127 ProducerThread** producer_threads = static_cast<ProducerThread**>(
128 gpr_zalloc(num_producer_threads * sizeof(ProducerThread*)));
129 ConsumerThread** consumer_threads = static_cast<ConsumerThread**>(
130 gpr_zalloc(num_consumer_threads * sizeof(ConsumerThread*)));
131
132 gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
133 for (int i = 0; i < num_producer_threads; ++i) {
134 producer_threads[i] = grpc_core::New<ProducerThread>(
135 &queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS);
136 producer_threads[i]->Start();
137 }
138 gpr_log(GPR_DEBUG, "ProducerThreads Started.");
139 gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
140 for (int i = 0; i < num_consumer_threads; ++i) {
141 consumer_threads[i] = grpc_core::New<ConsumerThread>(&queue);
142 consumer_threads[i]->Start();
143 }
144 gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
145 gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
146 for (int i = 0; i < num_producer_threads; ++i) {
147 producer_threads[i]->Join();
148 }
149 gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
150 gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
151 for (int i = 0; i < num_consumer_threads; ++i) {
152 queue.Put(nullptr);
153 }
154 for (int i = 0; i < num_consumer_threads; ++i) {
155 consumer_threads[i]->Join();
156 }
157 gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
158 gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
159 for (int i = 0; i < num_producer_threads; ++i) {
160 // Destructor of ProducerThread will do the check of WorkItems
161 grpc_core::Delete(producer_threads[i]);
162 }
163 gpr_free(producer_threads);
164 for (int i = 0; i < num_consumer_threads; ++i) {
165 grpc_core::Delete(consumer_threads[i]);
166 }
167 gpr_free(consumer_threads);
168 gpr_log(GPR_DEBUG, "Done.");
169 }
170
main(int argc,char ** argv)171 int main(int argc, char** argv) {
172 grpc::testing::TestEnvironment env(argc, argv);
173 grpc_init();
174 test_FIFO();
175 test_many_thread();
176 grpc_shutdown();
177 return 0;
178 }
179