1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 
25 #include <memory.h>
26 #include <stdio.h>
27 #include <atomic>
28 
29 #include <string>
30 
31 #include "absl/strings/str_cat.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/lib/gprpp/thd.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/iomgr/iomgr.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/tcp_server.h"
43 
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 
47 /* TODO(yashykt): When our macos testing infrastructure becomes good enough, we
48  * wouldn't need to reduce the number of threads on MacOS */
49 #ifdef __APPLE__
50 #define NUM_THREADS 10
51 #else
52 #define NUM_THREADS 100
53 #endif /* __APPLE */
54 
55 #define NUM_OUTER_LOOPS 10
56 #define NUM_INNER_LOOPS 10
57 #define DELAY_MILLIS 10
58 #define POLL_MILLIS 15000
59 
60 #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
61 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
62 #define DELAY_MILLIS_SHORT_TIMEOUTS 1
63 // in a successful test run, POLL_MILLIS should never be reached because all
64 // runs should end after the shorter delay_millis
65 #define POLL_MILLIS_SHORT_TIMEOUTS 30000
66 // it should never take longer that this to shutdown the server
67 #define SERVER_SHUTDOWN_TIMEOUT 30000
68 
tag(int n)69 static void* tag(int n) { return (void*)static_cast<uintptr_t>(n); }
detag(void * p)70 static int detag(void* p) { return static_cast<int>((uintptr_t)p); }
71 
create_loop_destroy(void * addr)72 void create_loop_destroy(void* addr) {
73   for (int i = 0; i < NUM_OUTER_LOOPS; ++i) {
74     grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
75     grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
76                                                       nullptr, nullptr);
77 
78     for (int j = 0; j < NUM_INNER_LOOPS; ++j) {
79       gpr_timespec later_time =
80           grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS);
81       grpc_connectivity_state state =
82           grpc_channel_check_connectivity_state(chan, 1);
83       grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
84                                             nullptr);
85       gpr_timespec poll_time =
86           grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
87       GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type ==
88                  GRPC_OP_COMPLETE);
89       /* check that the watcher from "watch state" was free'd */
90       GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
91     }
92     grpc_channel_destroy(chan);
93     grpc_completion_queue_destroy(cq);
94   }
95 }
96 
97 // Always stack-allocate or new ServerThreadArgs; never use gpr_malloc since
98 // this contains C++ objects.
99 struct ServerThreadArgs {
100   std::string addr;
101   grpc_server* server = nullptr;
102   grpc_completion_queue* cq = nullptr;
103   std::vector<grpc_pollset*> pollset;
104   gpr_mu* mu = nullptr;
105   gpr_event ready;
106   std::atomic_bool stop{false};
107 };
108 
server_thread(void * vargs)109 void server_thread(void* vargs) {
110   struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
111   grpc_event ev;
112   gpr_timespec deadline =
113       grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
114   ev = grpc_completion_queue_next(args->cq, deadline, nullptr);
115   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
116   GPR_ASSERT(detag(ev.tag) == 0xd1e);
117 }
118 
on_connect(void * vargs,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)119 static void on_connect(void* vargs, grpc_endpoint* tcp,
120                        grpc_pollset* /*accepting_pollset*/,
121                        grpc_tcp_server_acceptor* acceptor) {
122   gpr_free(acceptor);
123   struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
124   grpc_endpoint_shutdown(tcp,
125                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
126   grpc_endpoint_destroy(tcp);
127   gpr_mu_lock(args->mu);
128   GRPC_LOG_IF_ERROR("pollset_kick",
129                     grpc_pollset_kick(args->pollset[0], nullptr));
130   gpr_mu_unlock(args->mu);
131 }
132 
bad_server_thread(void * vargs)133 void bad_server_thread(void* vargs) {
134   struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
135 
136   grpc_core::ExecCtx exec_ctx;
137   grpc_resolved_address resolved_addr;
138   grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr);
139   int port;
140   grpc_tcp_server* s;
141   grpc_error* error = grpc_tcp_server_create(nullptr, nullptr, &s);
142   GPR_ASSERT(error == GRPC_ERROR_NONE);
143   memset(&resolved_addr, 0, sizeof(resolved_addr));
144   addr->sa_family = GRPC_AF_INET;
145   error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
146   GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
147   GPR_ASSERT(port > 0);
148   args->addr = absl::StrCat("localhost:", port);
149 
150   grpc_tcp_server_start(s, &args->pollset, on_connect, args);
151   gpr_event_set(&args->ready, (void*)1);
152 
153   gpr_mu_lock(args->mu);
154   while (args->stop.load(std::memory_order_acquire) == false) {
155     grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100;
156 
157     grpc_pollset_worker* worker = nullptr;
158     if (!GRPC_LOG_IF_ERROR(
159             "pollset_work",
160             grpc_pollset_work(args->pollset[0], &worker, deadline))) {
161       args->stop.store(true, std::memory_order_release);
162     }
163     gpr_mu_unlock(args->mu);
164 
165     gpr_mu_lock(args->mu);
166   }
167   gpr_mu_unlock(args->mu);
168 
169   grpc_tcp_server_unref(s);
170 }
171 
done_pollset_shutdown(void * pollset,grpc_error *)172 static void done_pollset_shutdown(void* pollset, grpc_error* /*error*/) {
173   grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset));
174   gpr_free(pollset);
175 }
176 
run_concurrent_connectivity_test()177 int run_concurrent_connectivity_test() {
178   struct ServerThreadArgs args;
179 
180   grpc_init();
181 
182   /* First round, no server */
183   {
184     gpr_log(GPR_DEBUG, "Wave 1");
185     grpc_core::Thread threads[NUM_THREADS];
186     args.addr = "localhost:54321";
187     for (auto& th : threads) {
188       th = grpc_core::Thread("grpc_wave_1", create_loop_destroy,
189                              const_cast<char*>(args.addr.c_str()));
190       th.Start();
191     }
192     for (auto& th : threads) {
193       th.Join();
194     }
195   }
196 
197   {
198     /* Second round, actual grpc server */
199     gpr_log(GPR_DEBUG, "Wave 2");
200     int port = grpc_pick_unused_port_or_die();
201     args.addr = absl::StrCat("localhost:", port);
202     args.server = grpc_server_create(nullptr, nullptr);
203     grpc_server_add_insecure_http2_port(args.server, args.addr.c_str());
204     args.cq = grpc_completion_queue_create_for_next(nullptr);
205     grpc_server_register_completion_queue(args.server, args.cq, nullptr);
206     grpc_server_start(args.server);
207     grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
208     server2.Start();
209 
210     grpc_core::Thread threads[NUM_THREADS];
211     for (auto& th : threads) {
212       th = grpc_core::Thread("grpc_wave_2", create_loop_destroy,
213                              const_cast<char*>(args.addr.c_str()));
214       th.Start();
215     }
216     for (auto& th : threads) {
217       th.Join();
218     }
219     grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
220 
221     server2.Join();
222     grpc_server_destroy(args.server);
223     grpc_completion_queue_destroy(args.cq);
224   }
225 
226   {
227     /* Third round, bogus tcp server */
228     gpr_log(GPR_DEBUG, "Wave 3");
229     auto* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
230     grpc_pollset_init(pollset, &args.mu);
231     args.pollset.push_back(pollset);
232     gpr_event_init(&args.ready);
233     grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
234     server3.Start();
235     gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
236 
237     grpc_core::Thread threads[NUM_THREADS];
238     for (auto& th : threads) {
239       th = grpc_core::Thread("grpc_wave_3", create_loop_destroy,
240                              const_cast<char*>(args.addr.c_str()));
241       th.Start();
242     }
243     for (auto& th : threads) {
244       th.Join();
245     }
246 
247     args.stop.store(true, std::memory_order_release);
248     server3.Join();
249     {
250       grpc_core::ExecCtx exec_ctx;
251       grpc_pollset_shutdown(
252           args.pollset[0],
253           GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset[0],
254                               grpc_schedule_on_exec_ctx));
255     }
256   }
257 
258   grpc_shutdown();
259   return 0;
260 }
261 
watches_with_short_timeouts(void * addr)262 void watches_with_short_timeouts(void* addr) {
263   for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) {
264     grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
265     grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
266                                                       nullptr, nullptr);
267 
268     for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) {
269       gpr_timespec later_time =
270           grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS);
271       grpc_connectivity_state state =
272           grpc_channel_check_connectivity_state(chan, 0);
273       GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
274       grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
275                                             nullptr);
276       gpr_timespec poll_time =
277           grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS);
278       grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
279       GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
280       GPR_ASSERT(ev.success == false);
281       /* check that the watcher from "watch state" was free'd */
282       GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
283     }
284     grpc_channel_destroy(chan);
285     grpc_completion_queue_destroy(cq);
286   }
287 }
288 
289 // This test tries to catch deadlock situations.
290 // With short timeouts on "watches" and long timeouts on cq next calls,
291 // so that a QUEUE_TIMEOUT likely means that something is stuck.
run_concurrent_watches_with_short_timeouts_test()292 int run_concurrent_watches_with_short_timeouts_test() {
293   grpc_init();
294 
295   grpc_core::Thread threads[NUM_THREADS];
296 
297   for (auto& th : threads) {
298     th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
299                            const_cast<char*>("localhost:54321"));
300     th.Start();
301   }
302   for (auto& th : threads) {
303     th.Join();
304   }
305 
306   grpc_shutdown();
307   return 0;
308 }
309 
main(int argc,char ** argv)310 int main(int argc, char** argv) {
311   grpc::testing::TestEnvironment env(argc, argv);
312 
313   run_concurrent_connectivity_test();
314   run_concurrent_watches_with_short_timeouts_test();
315 }
316