1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include <memory.h>
26 #include <stdio.h>
27 #include <atomic>
28
29 #include <string>
30
31 #include "absl/strings/str_cat.h"
32
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/lib/gprpp/thd.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/iomgr/iomgr.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/tcp_server.h"
43
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46
47 /* TODO(yashykt): When our macos testing infrastructure becomes good enough, we
48 * wouldn't need to reduce the number of threads on MacOS */
49 #ifdef __APPLE__
50 #define NUM_THREADS 10
51 #else
52 #define NUM_THREADS 100
53 #endif /* __APPLE */
54
55 #define NUM_OUTER_LOOPS 10
56 #define NUM_INNER_LOOPS 10
57 #define DELAY_MILLIS 10
58 #define POLL_MILLIS 15000
59
60 #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
61 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
62 #define DELAY_MILLIS_SHORT_TIMEOUTS 1
63 // in a successful test run, POLL_MILLIS should never be reached because all
64 // runs should end after the shorter delay_millis
65 #define POLL_MILLIS_SHORT_TIMEOUTS 30000
66 // it should never take longer that this to shutdown the server
67 #define SERVER_SHUTDOWN_TIMEOUT 30000
68
tag(int n)69 static void* tag(int n) { return (void*)static_cast<uintptr_t>(n); }
detag(void * p)70 static int detag(void* p) { return static_cast<int>((uintptr_t)p); }
71
create_loop_destroy(void * addr)72 void create_loop_destroy(void* addr) {
73 for (int i = 0; i < NUM_OUTER_LOOPS; ++i) {
74 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
75 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
76 nullptr, nullptr);
77
78 for (int j = 0; j < NUM_INNER_LOOPS; ++j) {
79 gpr_timespec later_time =
80 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS);
81 grpc_connectivity_state state =
82 grpc_channel_check_connectivity_state(chan, 1);
83 grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
84 nullptr);
85 gpr_timespec poll_time =
86 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
87 GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type ==
88 GRPC_OP_COMPLETE);
89 /* check that the watcher from "watch state" was free'd */
90 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
91 }
92 grpc_channel_destroy(chan);
93 grpc_completion_queue_destroy(cq);
94 }
95 }
96
97 // Always stack-allocate or new ServerThreadArgs; never use gpr_malloc since
98 // this contains C++ objects.
99 struct ServerThreadArgs {
100 std::string addr;
101 grpc_server* server = nullptr;
102 grpc_completion_queue* cq = nullptr;
103 std::vector<grpc_pollset*> pollset;
104 gpr_mu* mu = nullptr;
105 gpr_event ready;
106 std::atomic_bool stop{false};
107 };
108
server_thread(void * vargs)109 void server_thread(void* vargs) {
110 struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
111 grpc_event ev;
112 gpr_timespec deadline =
113 grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
114 ev = grpc_completion_queue_next(args->cq, deadline, nullptr);
115 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
116 GPR_ASSERT(detag(ev.tag) == 0xd1e);
117 }
118
on_connect(void * vargs,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)119 static void on_connect(void* vargs, grpc_endpoint* tcp,
120 grpc_pollset* /*accepting_pollset*/,
121 grpc_tcp_server_acceptor* acceptor) {
122 gpr_free(acceptor);
123 struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
124 grpc_endpoint_shutdown(tcp,
125 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
126 grpc_endpoint_destroy(tcp);
127 gpr_mu_lock(args->mu);
128 GRPC_LOG_IF_ERROR("pollset_kick",
129 grpc_pollset_kick(args->pollset[0], nullptr));
130 gpr_mu_unlock(args->mu);
131 }
132
bad_server_thread(void * vargs)133 void bad_server_thread(void* vargs) {
134 struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
135
136 grpc_core::ExecCtx exec_ctx;
137 grpc_resolved_address resolved_addr;
138 grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr);
139 int port;
140 grpc_tcp_server* s;
141 grpc_error* error = grpc_tcp_server_create(nullptr, nullptr, &s);
142 GPR_ASSERT(error == GRPC_ERROR_NONE);
143 memset(&resolved_addr, 0, sizeof(resolved_addr));
144 addr->sa_family = GRPC_AF_INET;
145 error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
146 GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
147 GPR_ASSERT(port > 0);
148 args->addr = absl::StrCat("localhost:", port);
149
150 grpc_tcp_server_start(s, &args->pollset, on_connect, args);
151 gpr_event_set(&args->ready, (void*)1);
152
153 gpr_mu_lock(args->mu);
154 while (args->stop.load(std::memory_order_acquire) == false) {
155 grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100;
156
157 grpc_pollset_worker* worker = nullptr;
158 if (!GRPC_LOG_IF_ERROR(
159 "pollset_work",
160 grpc_pollset_work(args->pollset[0], &worker, deadline))) {
161 args->stop.store(true, std::memory_order_release);
162 }
163 gpr_mu_unlock(args->mu);
164
165 gpr_mu_lock(args->mu);
166 }
167 gpr_mu_unlock(args->mu);
168
169 grpc_tcp_server_unref(s);
170 }
171
done_pollset_shutdown(void * pollset,grpc_error *)172 static void done_pollset_shutdown(void* pollset, grpc_error* /*error*/) {
173 grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset));
174 gpr_free(pollset);
175 }
176
run_concurrent_connectivity_test()177 int run_concurrent_connectivity_test() {
178 struct ServerThreadArgs args;
179
180 grpc_init();
181
182 /* First round, no server */
183 {
184 gpr_log(GPR_DEBUG, "Wave 1");
185 grpc_core::Thread threads[NUM_THREADS];
186 args.addr = "localhost:54321";
187 for (auto& th : threads) {
188 th = grpc_core::Thread("grpc_wave_1", create_loop_destroy,
189 const_cast<char*>(args.addr.c_str()));
190 th.Start();
191 }
192 for (auto& th : threads) {
193 th.Join();
194 }
195 }
196
197 {
198 /* Second round, actual grpc server */
199 gpr_log(GPR_DEBUG, "Wave 2");
200 int port = grpc_pick_unused_port_or_die();
201 args.addr = absl::StrCat("localhost:", port);
202 args.server = grpc_server_create(nullptr, nullptr);
203 grpc_server_add_insecure_http2_port(args.server, args.addr.c_str());
204 args.cq = grpc_completion_queue_create_for_next(nullptr);
205 grpc_server_register_completion_queue(args.server, args.cq, nullptr);
206 grpc_server_start(args.server);
207 grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
208 server2.Start();
209
210 grpc_core::Thread threads[NUM_THREADS];
211 for (auto& th : threads) {
212 th = grpc_core::Thread("grpc_wave_2", create_loop_destroy,
213 const_cast<char*>(args.addr.c_str()));
214 th.Start();
215 }
216 for (auto& th : threads) {
217 th.Join();
218 }
219 grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
220
221 server2.Join();
222 grpc_server_destroy(args.server);
223 grpc_completion_queue_destroy(args.cq);
224 }
225
226 {
227 /* Third round, bogus tcp server */
228 gpr_log(GPR_DEBUG, "Wave 3");
229 auto* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
230 grpc_pollset_init(pollset, &args.mu);
231 args.pollset.push_back(pollset);
232 gpr_event_init(&args.ready);
233 grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
234 server3.Start();
235 gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
236
237 grpc_core::Thread threads[NUM_THREADS];
238 for (auto& th : threads) {
239 th = grpc_core::Thread("grpc_wave_3", create_loop_destroy,
240 const_cast<char*>(args.addr.c_str()));
241 th.Start();
242 }
243 for (auto& th : threads) {
244 th.Join();
245 }
246
247 args.stop.store(true, std::memory_order_release);
248 server3.Join();
249 {
250 grpc_core::ExecCtx exec_ctx;
251 grpc_pollset_shutdown(
252 args.pollset[0],
253 GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset[0],
254 grpc_schedule_on_exec_ctx));
255 }
256 }
257
258 grpc_shutdown();
259 return 0;
260 }
261
watches_with_short_timeouts(void * addr)262 void watches_with_short_timeouts(void* addr) {
263 for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) {
264 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
265 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
266 nullptr, nullptr);
267
268 for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) {
269 gpr_timespec later_time =
270 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS);
271 grpc_connectivity_state state =
272 grpc_channel_check_connectivity_state(chan, 0);
273 GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
274 grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
275 nullptr);
276 gpr_timespec poll_time =
277 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS);
278 grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
279 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
280 GPR_ASSERT(ev.success == false);
281 /* check that the watcher from "watch state" was free'd */
282 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
283 }
284 grpc_channel_destroy(chan);
285 grpc_completion_queue_destroy(cq);
286 }
287 }
288
289 // This test tries to catch deadlock situations.
290 // With short timeouts on "watches" and long timeouts on cq next calls,
291 // so that a QUEUE_TIMEOUT likely means that something is stuck.
run_concurrent_watches_with_short_timeouts_test()292 int run_concurrent_watches_with_short_timeouts_test() {
293 grpc_init();
294
295 grpc_core::Thread threads[NUM_THREADS];
296
297 for (auto& th : threads) {
298 th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
299 const_cast<char*>("localhost:54321"));
300 th.Start();
301 }
302 for (auto& th : threads) {
303 th.Join();
304 }
305
306 grpc_shutdown();
307 return 0;
308 }
309
main(int argc,char ** argv)310 int main(int argc, char** argv) {
311 grpc::testing::TestEnvironment env(argc, argv);
312
313 run_concurrent_connectivity_test();
314 run_concurrent_watches_with_short_timeouts_test();
315 }
316