1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/fixtures/proxy.h"
20 
21 #include <string.h>
22 
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/host_port.h"
29 #include "src/core/lib/gprpp/memory.h"
30 #include "src/core/lib/gprpp/thd.h"
31 #include "test/core/util/port.h"
32 
33 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy34   grpc_end2end_proxy()
35       : cq(nullptr),
36         server(nullptr),
37         client(nullptr),
38         shutdown(false),
39         new_call(nullptr) {
40     memset(&new_call_details, 0, sizeof(new_call_details));
41     memset(&new_call_metadata, 0, sizeof(new_call_metadata));
42   }
43   grpc_core::Thread thd;
44   std::string proxy_port;
45   std::string server_port;
46   grpc_completion_queue* cq;
47   grpc_server* server;
48   grpc_channel* client;
49 
50   int shutdown;
51 
52   /* requested call */
53   grpc_call* new_call;
54   grpc_call_details new_call_details;
55   grpc_metadata_array new_call_metadata;
56 };
57 
58 typedef struct {
59   void (*func)(void* arg, int success);
60   void* arg;
61 } closure;
62 
63 typedef struct {
64   gpr_refcount refs;
65   grpc_end2end_proxy* proxy;
66 
67   grpc_call* c2p;
68   grpc_call* p2s;
69 
70   grpc_metadata_array c2p_initial_metadata;
71   grpc_metadata_array p2s_initial_metadata;
72 
73   grpc_byte_buffer* c2p_msg;
74   grpc_byte_buffer* p2s_msg;
75 
76   grpc_metadata_array p2s_trailing_metadata;
77   grpc_status_code p2s_status;
78   grpc_slice p2s_status_details;
79 
80   int c2p_server_cancelled;
81 } proxy_call;
82 
83 static void thread_main(void* arg);
84 static void request_call(grpc_end2end_proxy* proxy);
85 
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,grpc_channel_args * client_args,grpc_channel_args * server_args)86 grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
87                                               grpc_channel_args* client_args,
88                                               grpc_channel_args* server_args) {
89   int proxy_port = grpc_pick_unused_port_or_die();
90   int server_port = grpc_pick_unused_port_or_die();
91 
92   grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
93 
94   proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
95   proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
96 
97   gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port.c_str(),
98           proxy->server_port.c_str());
99 
100   proxy->cq = grpc_completion_queue_create_for_next(nullptr);
101   proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
102   proxy->client = def->create_client(proxy->server_port.c_str(), client_args);
103 
104   grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
105   grpc_server_start(proxy->server);
106 
107   grpc_call_details_init(&proxy->new_call_details);
108   proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
109   proxy->thd.Start();
110 
111   request_call(proxy);
112 
113   return proxy;
114 }
115 
new_closure(void (* func)(void * arg,int success),void * arg)116 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
117   closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
118   cl->func = func;
119   cl->arg = arg;
120   return cl;
121 }
122 
shutdown_complete(void * arg,int)123 static void shutdown_complete(void* arg, int /*success*/) {
124   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
125   proxy->shutdown = 1;
126   grpc_completion_queue_shutdown(proxy->cq);
127 }
128 
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)129 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
130   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
131                                   new_closure(shutdown_complete, proxy));
132   proxy->thd.Join();
133   grpc_server_destroy(proxy->server);
134   grpc_channel_destroy(proxy->client);
135   grpc_completion_queue_destroy(proxy->cq);
136   grpc_call_details_destroy(&proxy->new_call_details);
137   delete proxy;
138 }
139 
unrefpc(proxy_call * pc,const char *)140 static void unrefpc(proxy_call* pc, const char* /*reason*/) {
141   if (gpr_unref(&pc->refs)) {
142     grpc_call_unref(pc->c2p);
143     grpc_call_unref(pc->p2s);
144     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
145     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
146     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
147     grpc_slice_unref(pc->p2s_status_details);
148     gpr_free(pc);
149   }
150 }
151 
refpc(proxy_call * pc,const char *)152 static void refpc(proxy_call* pc, const char* /*reason*/) {
153   gpr_ref(&pc->refs);
154 }
155 
on_c2p_sent_initial_metadata(void * arg,int)156 static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
157   proxy_call* pc = static_cast<proxy_call*>(arg);
158   unrefpc(pc, "on_c2p_sent_initial_metadata");
159 }
160 
on_p2s_recv_initial_metadata(void * arg,int)161 static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
162   proxy_call* pc = static_cast<proxy_call*>(arg);
163   grpc_op op;
164   grpc_call_error err;
165 
166   memset(&op, 0, sizeof(op));
167   if (!pc->proxy->shutdown) {
168     op.op = GRPC_OP_SEND_INITIAL_METADATA;
169     op.flags = 0;
170     op.reserved = nullptr;
171     op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
172     op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
173     refpc(pc, "on_c2p_sent_initial_metadata");
174     err = grpc_call_start_batch(pc->c2p, &op, 1,
175                                 new_closure(on_c2p_sent_initial_metadata, pc),
176                                 nullptr);
177     GPR_ASSERT(err == GRPC_CALL_OK);
178   }
179 
180   unrefpc(pc, "on_p2s_recv_initial_metadata");
181 }
182 
on_p2s_sent_initial_metadata(void * arg,int)183 static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
184   proxy_call* pc = static_cast<proxy_call*>(arg);
185   unrefpc(pc, "on_p2s_sent_initial_metadata");
186 }
187 
188 static void on_c2p_recv_msg(void* arg, int success);
189 
on_p2s_sent_message(void * arg,int success)190 static void on_p2s_sent_message(void* arg, int success) {
191   proxy_call* pc = static_cast<proxy_call*>(arg);
192   grpc_op op;
193   grpc_call_error err;
194 
195   grpc_byte_buffer_destroy(pc->c2p_msg);
196   if (!pc->proxy->shutdown && success) {
197     op.op = GRPC_OP_RECV_MESSAGE;
198     op.flags = 0;
199     op.reserved = nullptr;
200     op.data.recv_message.recv_message = &pc->c2p_msg;
201     refpc(pc, "on_c2p_recv_msg");
202     err = grpc_call_start_batch(pc->c2p, &op, 1,
203                                 new_closure(on_c2p_recv_msg, pc), nullptr);
204     GPR_ASSERT(err == GRPC_CALL_OK);
205   }
206 
207   unrefpc(pc, "on_p2s_sent_message");
208 }
209 
on_p2s_sent_close(void * arg,int)210 static void on_p2s_sent_close(void* arg, int /*success*/) {
211   proxy_call* pc = static_cast<proxy_call*>(arg);
212   unrefpc(pc, "on_p2s_sent_close");
213 }
214 
on_c2p_recv_msg(void * arg,int success)215 static void on_c2p_recv_msg(void* arg, int success) {
216   proxy_call* pc = static_cast<proxy_call*>(arg);
217   grpc_op op;
218   grpc_call_error err;
219 
220   if (!pc->proxy->shutdown && success) {
221     if (pc->c2p_msg != nullptr) {
222       op.op = GRPC_OP_SEND_MESSAGE;
223       op.flags = 0;
224       op.reserved = nullptr;
225       op.data.send_message.send_message = pc->c2p_msg;
226       refpc(pc, "on_p2s_sent_message");
227       err = grpc_call_start_batch(
228           pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
229       GPR_ASSERT(err == GRPC_CALL_OK);
230     } else {
231       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
232       op.flags = 0;
233       op.reserved = nullptr;
234       refpc(pc, "on_p2s_sent_close");
235       err = grpc_call_start_batch(pc->p2s, &op, 1,
236                                   new_closure(on_p2s_sent_close, pc), nullptr);
237       GPR_ASSERT(err == GRPC_CALL_OK);
238     }
239   } else {
240     if (pc->c2p_msg != nullptr) {
241       grpc_byte_buffer_destroy(pc->c2p_msg);
242     }
243   }
244 
245   unrefpc(pc, "on_c2p_recv_msg");
246 }
247 
248 static void on_p2s_recv_msg(void* arg, int success);
249 
on_c2p_sent_message(void * arg,int success)250 static void on_c2p_sent_message(void* arg, int success) {
251   proxy_call* pc = static_cast<proxy_call*>(arg);
252   grpc_op op;
253   grpc_call_error err;
254 
255   grpc_byte_buffer_destroy(pc->p2s_msg);
256   if (!pc->proxy->shutdown && success) {
257     op.op = GRPC_OP_RECV_MESSAGE;
258     op.flags = 0;
259     op.reserved = nullptr;
260     op.data.recv_message.recv_message = &pc->p2s_msg;
261     refpc(pc, "on_p2s_recv_msg");
262     err = grpc_call_start_batch(pc->p2s, &op, 1,
263                                 new_closure(on_p2s_recv_msg, pc), nullptr);
264     GPR_ASSERT(err == GRPC_CALL_OK);
265   }
266 
267   unrefpc(pc, "on_c2p_sent_message");
268 }
269 
on_p2s_recv_msg(void * arg,int success)270 static void on_p2s_recv_msg(void* arg, int success) {
271   proxy_call* pc = static_cast<proxy_call*>(arg);
272   grpc_op op;
273   grpc_call_error err;
274 
275   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
276     op.op = GRPC_OP_SEND_MESSAGE;
277     op.flags = 0;
278     op.reserved = nullptr;
279     op.data.send_message.send_message = pc->p2s_msg;
280     refpc(pc, "on_c2p_sent_message");
281     err = grpc_call_start_batch(pc->c2p, &op, 1,
282                                 new_closure(on_c2p_sent_message, pc), nullptr);
283     GPR_ASSERT(err == GRPC_CALL_OK);
284   } else {
285     grpc_byte_buffer_destroy(pc->p2s_msg);
286   }
287   unrefpc(pc, "on_p2s_recv_msg");
288 }
289 
on_c2p_sent_status(void * arg,int)290 static void on_c2p_sent_status(void* arg, int /*success*/) {
291   proxy_call* pc = static_cast<proxy_call*>(arg);
292   unrefpc(pc, "on_c2p_sent_status");
293 }
294 
on_p2s_status(void * arg,int success)295 static void on_p2s_status(void* arg, int success) {
296   proxy_call* pc = static_cast<proxy_call*>(arg);
297   grpc_op op;
298   grpc_call_error err;
299 
300   if (!pc->proxy->shutdown) {
301     GPR_ASSERT(success);
302     op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
303     op.flags = 0;
304     op.reserved = nullptr;
305     op.data.send_status_from_server.trailing_metadata_count =
306         pc->p2s_trailing_metadata.count;
307     op.data.send_status_from_server.trailing_metadata =
308         pc->p2s_trailing_metadata.metadata;
309     op.data.send_status_from_server.status = pc->p2s_status;
310     op.data.send_status_from_server.status_details = &pc->p2s_status_details;
311     refpc(pc, "on_c2p_sent_status");
312     err = grpc_call_start_batch(pc->c2p, &op, 1,
313                                 new_closure(on_c2p_sent_status, pc), nullptr);
314     GPR_ASSERT(err == GRPC_CALL_OK);
315   }
316 
317   unrefpc(pc, "on_p2s_status");
318 }
319 
on_c2p_closed(void * arg,int)320 static void on_c2p_closed(void* arg, int /*success*/) {
321   proxy_call* pc = static_cast<proxy_call*>(arg);
322   unrefpc(pc, "on_c2p_closed");
323 }
324 
on_new_call(void * arg,int success)325 static void on_new_call(void* arg, int success) {
326   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
327   grpc_call_error err;
328 
329   if (success) {
330     grpc_op op;
331     memset(&op, 0, sizeof(op));
332     proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
333     memset(pc, 0, sizeof(*pc));
334     pc->proxy = proxy;
335     GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
336              proxy->new_call_metadata);
337     pc->c2p = proxy->new_call;
338     pc->p2s = grpc_channel_create_call(
339         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
340         proxy->new_call_details.method, &proxy->new_call_details.host,
341         proxy->new_call_details.deadline, nullptr);
342     gpr_ref_init(&pc->refs, 1);
343 
344     op.reserved = nullptr;
345 
346     op.op = GRPC_OP_RECV_INITIAL_METADATA;
347     op.flags = 0;
348     op.data.recv_initial_metadata.recv_initial_metadata =
349         &pc->p2s_initial_metadata;
350     refpc(pc, "on_p2s_recv_initial_metadata");
351     err = grpc_call_start_batch(pc->p2s, &op, 1,
352                                 new_closure(on_p2s_recv_initial_metadata, pc),
353                                 nullptr);
354     GPR_ASSERT(err == GRPC_CALL_OK);
355 
356     op.op = GRPC_OP_SEND_INITIAL_METADATA;
357     op.flags = proxy->new_call_details.flags;
358     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
359     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
360     refpc(pc, "on_p2s_sent_initial_metadata");
361     err = grpc_call_start_batch(pc->p2s, &op, 1,
362                                 new_closure(on_p2s_sent_initial_metadata, pc),
363                                 nullptr);
364     GPR_ASSERT(err == GRPC_CALL_OK);
365 
366     op.op = GRPC_OP_RECV_MESSAGE;
367     op.flags = 0;
368     op.data.recv_message.recv_message = &pc->c2p_msg;
369     refpc(pc, "on_c2p_recv_msg");
370     err = grpc_call_start_batch(pc->c2p, &op, 1,
371                                 new_closure(on_c2p_recv_msg, pc), nullptr);
372     GPR_ASSERT(err == GRPC_CALL_OK);
373 
374     op.op = GRPC_OP_RECV_MESSAGE;
375     op.flags = 0;
376     op.data.recv_message.recv_message = &pc->p2s_msg;
377     refpc(pc, "on_p2s_recv_msg");
378     err = grpc_call_start_batch(pc->p2s, &op, 1,
379                                 new_closure(on_p2s_recv_msg, pc), nullptr);
380     GPR_ASSERT(err == GRPC_CALL_OK);
381 
382     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
383     op.flags = 0;
384     op.data.recv_status_on_client.trailing_metadata =
385         &pc->p2s_trailing_metadata;
386     op.data.recv_status_on_client.status = &pc->p2s_status;
387     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
388     refpc(pc, "on_p2s_status");
389     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
390                                 nullptr);
391     GPR_ASSERT(err == GRPC_CALL_OK);
392 
393     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
394     op.flags = 0;
395     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
396     refpc(pc, "on_c2p_closed");
397     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
398                                 nullptr);
399     GPR_ASSERT(err == GRPC_CALL_OK);
400 
401     request_call(proxy);
402 
403     grpc_call_details_destroy(&proxy->new_call_details);
404     grpc_call_details_init(&proxy->new_call_details);
405 
406     unrefpc(pc, "init");
407   } else {
408     GPR_ASSERT(proxy->new_call == nullptr);
409   }
410 }
411 
request_call(grpc_end2end_proxy * proxy)412 static void request_call(grpc_end2end_proxy* proxy) {
413   proxy->new_call = nullptr;
414   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
415                                  proxy->server, &proxy->new_call,
416                                  &proxy->new_call_details,
417                                  &proxy->new_call_metadata, proxy->cq,
418                                  proxy->cq, new_closure(on_new_call, proxy)));
419 }
420 
thread_main(void * arg)421 static void thread_main(void* arg) {
422   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
423   closure* cl;
424   for (;;) {
425     grpc_event ev = grpc_completion_queue_next(
426         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
427     switch (ev.type) {
428       case GRPC_QUEUE_TIMEOUT:
429         gpr_log(GPR_ERROR, "Should never reach here");
430         abort();
431       case GRPC_QUEUE_SHUTDOWN:
432         return;
433       case GRPC_OP_COMPLETE:
434         cl = static_cast<closure*>(ev.tag);
435         cl->func(cl->arg, ev.success);
436         gpr_free(cl);
437         break;
438     }
439   }
440 }
441 
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)442 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
443   return proxy->proxy_port.c_str();
444 }
445 
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)446 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
447   return proxy->server_port.c_str();
448 }
449