1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/end2end_tests.h"
20 
21 #include <stdbool.h>
22 #include <stdio.h>
23 #include <string.h>
24 
25 #include <grpc/byte_buffer.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include "src/core/lib/channel/channel_stack_builder.h"
30 #include "src/core/lib/surface/channel_init.h"
31 #include "test/core/end2end/cq_verifier.h"
32 
33 static bool g_enable_filter = false;
34 
tag(intptr_t t)35 static void* tag(intptr_t t) { return (void*)t; }
36 
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)37 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
38                                             const char* test_name,
39                                             grpc_channel_args* client_args,
40                                             grpc_channel_args* server_args) {
41   grpc_end2end_test_fixture f;
42   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
43   f = config.create_fixture(client_args, server_args);
44   config.init_server(&f, server_args);
45   config.init_client(&f, client_args);
46   return f;
47 }
48 
n_seconds_from_now(int n)49 static gpr_timespec n_seconds_from_now(int n) {
50   return grpc_timeout_seconds_to_deadline(n);
51 }
52 
five_seconds_from_now(void)53 static gpr_timespec five_seconds_from_now(void) {
54   return n_seconds_from_now(5);
55 }
56 
drain_cq(grpc_completion_queue * cq)57 static void drain_cq(grpc_completion_queue* cq) {
58   grpc_event ev;
59   do {
60     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
61   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
62 }
63 
shutdown_server(grpc_end2end_test_fixture * f)64 static void shutdown_server(grpc_end2end_test_fixture* f) {
65   if (!f->server) return;
66   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
67   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
68                                          grpc_timeout_seconds_to_deadline(5),
69                                          nullptr)
70                  .type == GRPC_OP_COMPLETE);
71   grpc_server_destroy(f->server);
72   f->server = nullptr;
73 }
74 
shutdown_client(grpc_end2end_test_fixture * f)75 static void shutdown_client(grpc_end2end_test_fixture* f) {
76   if (!f->client) return;
77   grpc_channel_destroy(f->client);
78   f->client = nullptr;
79 }
80 
end_test(grpc_end2end_test_fixture * f)81 static void end_test(grpc_end2end_test_fixture* f) {
82   shutdown_server(f);
83   shutdown_client(f);
84 
85   grpc_completion_queue_shutdown(f->cq);
86   drain_cq(f->cq);
87   grpc_completion_queue_destroy(f->cq);
88   grpc_completion_queue_destroy(f->shutdown_cq);
89 }
90 
91 /* Simple request via a server filter that always closes the stream.*/
test_request(grpc_end2end_test_config config)92 static void test_request(grpc_end2end_test_config config) {
93   grpc_call* c;
94   grpc_call* s;
95   grpc_slice request_payload_slice =
96       grpc_slice_from_copied_string("hello world");
97   grpc_byte_buffer* request_payload =
98       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
99   grpc_end2end_test_fixture f =
100       begin_test(config, "filter_causes_close", nullptr, nullptr);
101   cq_verifier* cqv = cq_verifier_create(f.cq);
102   grpc_op ops[6];
103   grpc_op* op;
104   grpc_metadata_array initial_metadata_recv;
105   grpc_metadata_array trailing_metadata_recv;
106   grpc_metadata_array request_metadata_recv;
107   grpc_byte_buffer* request_payload_recv = nullptr;
108   grpc_call_details call_details;
109   grpc_status_code status;
110   grpc_call_error error;
111   grpc_slice details;
112 
113   gpr_timespec deadline = five_seconds_from_now();
114   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
115                                grpc_slice_from_static_string("/foo"), nullptr,
116                                deadline, nullptr);
117   GPR_ASSERT(c);
118 
119   grpc_metadata_array_init(&initial_metadata_recv);
120   grpc_metadata_array_init(&trailing_metadata_recv);
121   grpc_metadata_array_init(&request_metadata_recv);
122   grpc_call_details_init(&call_details);
123 
124   memset(ops, 0, sizeof(ops));
125   op = ops;
126   op->op = GRPC_OP_SEND_INITIAL_METADATA;
127   op->data.send_initial_metadata.count = 0;
128   op->data.send_initial_metadata.metadata = nullptr;
129   op->flags = 0;
130   op->reserved = nullptr;
131   op++;
132   op->op = GRPC_OP_SEND_MESSAGE;
133   op->data.send_message.send_message = request_payload;
134   op->flags = 0;
135   op->reserved = nullptr;
136   op++;
137   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
138   op->flags = 0;
139   op->reserved = nullptr;
140   op++;
141   op->op = GRPC_OP_RECV_INITIAL_METADATA;
142   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
143   op->flags = 0;
144   op->reserved = nullptr;
145   op++;
146   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
147   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
148   op->data.recv_status_on_client.status = &status;
149   op->data.recv_status_on_client.status_details = &details;
150   op->flags = 0;
151   op->reserved = nullptr;
152   op++;
153   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
154                                 nullptr);
155   GPR_ASSERT(GRPC_CALL_OK == error);
156 
157   error =
158       grpc_server_request_call(f.server, &s, &call_details,
159                                &request_metadata_recv, f.cq, f.cq, tag(101));
160   GPR_ASSERT(GRPC_CALL_OK == error);
161 
162   CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
163   cq_verify(cqv);
164 
165   GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
166   GPR_ASSERT(0 ==
167              grpc_slice_str_cmp(details, "Failure that's not preventable."));
168 
169   grpc_slice_unref(details);
170   grpc_metadata_array_destroy(&initial_metadata_recv);
171   grpc_metadata_array_destroy(&trailing_metadata_recv);
172   grpc_metadata_array_destroy(&request_metadata_recv);
173   grpc_call_details_destroy(&call_details);
174 
175   grpc_call_unref(c);
176 
177   cq_verifier_destroy(cqv);
178 
179   grpc_byte_buffer_destroy(request_payload);
180   grpc_byte_buffer_destroy(request_payload_recv);
181 
182   end_test(&f);
183   config.tear_down_data(&f);
184 }
185 
186 /*******************************************************************************
187  * Test filter - always closes incoming requests
188  */
189 
190 typedef struct {
191   grpc_closure* recv_im_ready;
192 } call_data;
193 
194 typedef struct {
195   uint8_t unused;
196 } channel_data;
197 
recv_im_ready(void * arg,grpc_error * error)198 static void recv_im_ready(void* arg, grpc_error* error) {
199   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
200   call_data* calld = static_cast<call_data*>(elem->call_data);
201   grpc_core::Closure::Run(
202       DEBUG_LOCATION, calld->recv_im_ready,
203       grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
204                              "Failure that's not preventable.", &error, 1),
205                          GRPC_ERROR_INT_GRPC_STATUS,
206                          GRPC_STATUS_PERMISSION_DENIED));
207 }
208 
start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)209 static void start_transport_stream_op_batch(
210     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
211   call_data* calld = static_cast<call_data*>(elem->call_data);
212   if (op->recv_initial_metadata) {
213     calld->recv_im_ready =
214         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
215     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
216         GRPC_CLOSURE_CREATE(recv_im_ready, elem, grpc_schedule_on_exec_ctx);
217   }
218   grpc_call_next_op(elem, op);
219 }
220 
init_call_elem(grpc_call_element *,const grpc_call_element_args *)221 static grpc_error* init_call_elem(grpc_call_element* /*elem*/,
222                                   const grpc_call_element_args* /*args*/) {
223   return GRPC_ERROR_NONE;
224 }
225 
destroy_call_elem(grpc_call_element *,const grpc_call_final_info *,grpc_closure *)226 static void destroy_call_elem(grpc_call_element* /*elem*/,
227                               const grpc_call_final_info* /*final_info*/,
228                               grpc_closure* /*ignored*/) {}
229 
init_channel_elem(grpc_channel_element *,grpc_channel_element_args *)230 static grpc_error* init_channel_elem(grpc_channel_element* /*elem*/,
231                                      grpc_channel_element_args* /*args*/) {
232   return GRPC_ERROR_NONE;
233 }
234 
destroy_channel_elem(grpc_channel_element *)235 static void destroy_channel_elem(grpc_channel_element* /*elem*/) {}
236 
237 static const grpc_channel_filter test_filter = {
238     start_transport_stream_op_batch,
239     grpc_channel_next_op,
240     sizeof(call_data),
241     init_call_elem,
242     grpc_call_stack_ignore_set_pollset_or_pollset_set,
243     destroy_call_elem,
244     sizeof(channel_data),
245     init_channel_elem,
246     destroy_channel_elem,
247     grpc_channel_next_get_info,
248     "filter_causes_close"};
249 
250 /*******************************************************************************
251  * Registration
252  */
253 
maybe_add_filter(grpc_channel_stack_builder * builder,void *)254 static bool maybe_add_filter(grpc_channel_stack_builder* builder,
255                              void* /*arg*/) {
256   if (g_enable_filter) {
257     return grpc_channel_stack_builder_prepend_filter(builder, &test_filter,
258                                                      nullptr, nullptr);
259   } else {
260     return true;
261   }
262 }
263 
init_plugin(void)264 static void init_plugin(void) {
265   grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, 0, maybe_add_filter,
266                                    nullptr);
267 }
268 
destroy_plugin(void)269 static void destroy_plugin(void) {}
270 
filter_causes_close(grpc_end2end_test_config config)271 void filter_causes_close(grpc_end2end_test_config config) {
272   g_enable_filter = true;
273   test_request(config);
274   g_enable_filter = false;
275 }
276 
filter_causes_close_pre_init(void)277 void filter_causes_close_pre_init(void) {
278   grpc_register_plugin(init_plugin, destroy_plugin);
279 }
280