1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/log.h>
20 #include <grpc/support/sync.h>
21 #include <grpc/support/time.h>
22 
23 #include "src/core/lib/gprpp/thd.h"
24 #include "test/core/end2end/cq_verifier.h"
25 #include "test/core/end2end/end2end_tests.h"
26 
tag(intptr_t t)27 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
28 
29 typedef struct {
30   gpr_event started;
31   grpc_channel* channel;
32   grpc_completion_queue* cq;
33 } child_events;
34 
35 struct CallbackContext {
36   grpc_completion_queue_functor functor;
37   gpr_event finished;
CallbackContextCallbackContext38   explicit CallbackContext(void (*cb)(grpc_completion_queue_functor* functor,
39                                       int success)) {
40     functor.functor_run = cb;
41     functor.inlineable = false;
42     gpr_event_init(&finished);
43   }
44 };
45 
child_thread(void * arg)46 static void child_thread(void* arg) {
47   child_events* ce = static_cast<child_events*>(arg);
48   grpc_event ev;
49   gpr_event_set(&ce->started, reinterpret_cast<void*>(1));
50   gpr_log(GPR_DEBUG, "verifying");
51   ev = grpc_completion_queue_next(ce->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC),
52                                   nullptr);
53   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
54   GPR_ASSERT(ev.tag == tag(1));
55   GPR_ASSERT(ev.success == 0);
56 }
57 
test_connectivity(grpc_end2end_test_config config)58 static void test_connectivity(grpc_end2end_test_config config) {
59   grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr);
60   grpc_connectivity_state state;
61   cq_verifier* cqv = cq_verifier_create(f.cq);
62   child_events ce;
63 
64   grpc_channel_args client_args;
65   grpc_arg arg_array[1];
66   arg_array[0].type = GRPC_ARG_INTEGER;
67   arg_array[0].key =
68       const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
69   arg_array[0].value.integer = 1000;
70   client_args.args = arg_array;
71   client_args.num_args = 1;
72 
73   config.init_client(&f, &client_args);
74 
75   ce.channel = f.client;
76   ce.cq = f.cq;
77   gpr_event_init(&ce.started);
78   grpc_core::Thread thd("grpc_connectivity", child_thread, &ce);
79   thd.Start();
80 
81   gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC));
82 
83   /* channels should start life in IDLE, and stay there */
84   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) ==
85              GRPC_CHANNEL_IDLE);
86   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
87   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) ==
88              GRPC_CHANNEL_IDLE);
89 
90   /* start watching for a change */
91   gpr_log(GPR_DEBUG, "watching");
92   grpc_channel_watch_connectivity_state(
93       f.client, GRPC_CHANNEL_IDLE, gpr_now(GPR_CLOCK_MONOTONIC), f.cq, tag(1));
94 
95   /* eventually the child thread completion should trigger */
96   thd.Join();
97 
98   /* check that we're still in idle, and start connecting */
99   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) ==
100              GRPC_CHANNEL_IDLE);
101   /* start watching for a change */
102   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
103                                         grpc_timeout_seconds_to_deadline(3),
104                                         f.cq, tag(2));
105 
106   /* and now the watch should trigger */
107   CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
108   cq_verify(cqv);
109   state = grpc_channel_check_connectivity_state(f.client, 0);
110   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
111              state == GRPC_CHANNEL_CONNECTING);
112 
113   /* quickly followed by a transition to TRANSIENT_FAILURE */
114   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_CONNECTING,
115                                         grpc_timeout_seconds_to_deadline(3),
116                                         f.cq, tag(3));
117   CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
118   cq_verify(cqv);
119   state = grpc_channel_check_connectivity_state(f.client, 0);
120   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
121              state == GRPC_CHANNEL_CONNECTING);
122 
123   gpr_log(GPR_DEBUG, "*** STARTING SERVER ***");
124 
125   /* now let's bring up a server to connect to */
126   config.init_server(&f, nullptr);
127 
128   gpr_log(GPR_DEBUG, "*** STARTED SERVER ***");
129 
130   /* we'll go through some set of transitions (some might be missed), until
131      READY is reached */
132   while (state != GRPC_CHANNEL_READY) {
133     grpc_channel_watch_connectivity_state(
134         f.client, state, grpc_timeout_seconds_to_deadline(3), f.cq, tag(4));
135     CQ_EXPECT_COMPLETION(cqv, tag(4), 1);
136     cq_verify(cqv);
137     state = grpc_channel_check_connectivity_state(f.client, 0);
138     GPR_ASSERT(state == GRPC_CHANNEL_READY ||
139                state == GRPC_CHANNEL_CONNECTING ||
140                state == GRPC_CHANNEL_TRANSIENT_FAILURE);
141   }
142 
143   /* bring down the server again */
144   /* we should go immediately to TRANSIENT_FAILURE */
145   gpr_log(GPR_DEBUG, "*** SHUTTING DOWN SERVER ***");
146 
147   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_READY,
148                                         grpc_timeout_seconds_to_deadline(3),
149                                         f.cq, tag(5));
150 
151   grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead));
152 
153   CQ_EXPECT_COMPLETION(cqv, tag(5), 1);
154   CQ_EXPECT_COMPLETION(cqv, tag(0xdead), 1);
155   cq_verify(cqv);
156   state = grpc_channel_check_connectivity_state(f.client, 0);
157   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
158              state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_IDLE);
159 
160   /* cleanup server */
161   grpc_server_destroy(f.server);
162 
163   gpr_log(GPR_DEBUG, "*** SHUTDOWN SERVER ***");
164 
165   grpc_channel_destroy(f.client);
166   grpc_completion_queue_shutdown(f.cq);
167   grpc_completion_queue_destroy(f.cq);
168 
169   /* shutdown_cq is not used in this test */
170   grpc_completion_queue_destroy(f.shutdown_cq);
171   config.tear_down_data(&f);
172 
173   cq_verifier_destroy(cqv);
174 }
175 
cb_watch_connectivity(grpc_completion_queue_functor * functor,int success)176 static void cb_watch_connectivity(grpc_completion_queue_functor* functor,
177                                   int success) {
178   CallbackContext* cb_ctx = reinterpret_cast<CallbackContext*>(functor);
179 
180   gpr_log(GPR_DEBUG, "cb_watch_connectivity called, verifying");
181 
182   /* callback must not have errors */
183   GPR_ASSERT(success != 0);
184 
185   gpr_event_set(&cb_ctx->finished, reinterpret_cast<void*>(1));
186 }
187 
cb_shutdown(grpc_completion_queue_functor * functor,int)188 static void cb_shutdown(grpc_completion_queue_functor* functor,
189                         int /*success*/) {
190   CallbackContext* cb_ctx = reinterpret_cast<CallbackContext*>(functor);
191 
192   gpr_log(GPR_DEBUG, "cb_shutdown called, nothing to do");
193   gpr_event_set(&cb_ctx->finished, reinterpret_cast<void*>(1));
194 }
195 
test_watch_connectivity_cq_callback(grpc_end2end_test_config config)196 static void test_watch_connectivity_cq_callback(
197     grpc_end2end_test_config config) {
198   CallbackContext cb_ctx(cb_watch_connectivity);
199   CallbackContext cb_shutdown_ctx(cb_shutdown);
200   grpc_completion_queue* cq;
201   grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr);
202 
203   config.init_client(&f, nullptr);
204 
205   /* start connecting */
206   grpc_channel_check_connectivity_state(f.client, 1);
207 
208   /* create the cq callback */
209   cq = grpc_completion_queue_create_for_callback(&cb_shutdown_ctx.functor,
210                                                  nullptr);
211 
212   /* start watching for any change, cb is immediately called
213    * and no dead lock should be raised */
214   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
215                                         grpc_timeout_seconds_to_deadline(3), cq,
216                                         &cb_ctx.functor);
217 
218   /* we just check that the callback was executed once notifying a connection
219    * transition */
220   GPR_ASSERT(gpr_event_wait(&cb_ctx.finished,
221                             gpr_inf_future(GPR_CLOCK_MONOTONIC)) != nullptr);
222 
223   /* shutdown, since shutdown cb might be executed in a background thread
224    * we actively wait till is executed. */
225   grpc_completion_queue_shutdown(cq);
226   gpr_event_wait(&cb_shutdown_ctx.finished,
227                  gpr_inf_future(GPR_CLOCK_MONOTONIC));
228 
229   /* cleanup */
230   grpc_channel_destroy(f.client);
231   grpc_completion_queue_destroy(cq);
232 
233   /* shutdown_cq and cq are not used in this test */
234   grpc_completion_queue_destroy(f.cq);
235   grpc_completion_queue_destroy(f.shutdown_cq);
236 
237   config.tear_down_data(&f);
238 }
239 
connectivity(grpc_end2end_test_config config)240 void connectivity(grpc_end2end_test_config config) {
241   GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
242   test_connectivity(config);
243   test_watch_connectivity_cq_callback(config);
244 }
245 
connectivity_pre_init(void)246 void connectivity_pre_init(void) {}
247