1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "test/core/util/test_config.h"
22 
23 #ifdef GRPC_TEST_PICK_PORT
24 #include <math.h>
25 #include <string.h>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/sync.h>
32 #include <grpc/support/time.h>
33 
34 #include "src/core/lib/http/httpcli.h"
35 #include "test/core/util/port_server_client.h"
36 
37 typedef struct freereq {
38   gpr_mu* mu = nullptr;
39   grpc_polling_entity pops = {};
40   int done = 0;
41 } freereq;
42 
destroy_pops_and_shutdown(void * p,grpc_error_handle)43 static void destroy_pops_and_shutdown(void* p, grpc_error_handle /*error*/) {
44   grpc_pollset* pollset =
45       grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
46   grpc_pollset_destroy(pollset);
47   gpr_free(pollset);
48 }
49 
freed_port_from_server(void * arg,grpc_error_handle)50 static void freed_port_from_server(void* arg, grpc_error_handle /*error*/) {
51   freereq* pr = static_cast<freereq*>(arg);
52   gpr_mu_lock(pr->mu);
53   pr->done = 1;
54   GRPC_LOG_IF_ERROR(
55       "pollset_kick",
56       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
57   gpr_mu_unlock(pr->mu);
58 }
59 
grpc_free_port_using_server(int port)60 void grpc_free_port_using_server(int port) {
61   grpc_httpcli_context context;
62   grpc_httpcli_request req;
63   grpc_httpcli_response rsp;
64   freereq pr;
65   char* path;
66   grpc_closure* shutdown_closure;
67 
68   grpc_init();
69   {
70     grpc_core::ExecCtx exec_ctx;
71 
72     pr = {};
73     memset(&req, 0, sizeof(req));
74     rsp = {};
75 
76     grpc_pollset* pollset =
77         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
78     grpc_pollset_init(pollset, &pr.mu);
79     pr.pops = grpc_polling_entity_create_from_pollset(pollset);
80     shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
81                                            grpc_schedule_on_exec_ctx);
82 
83     req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
84     gpr_asprintf(&path, "/drop/%d", port);
85     req.http.path = path;
86 
87     grpc_httpcli_context_init(&context);
88     grpc_resource_quota* resource_quota =
89         grpc_resource_quota_create("port_server_client/free");
90     grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
91                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
92                      GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
93                                          grpc_schedule_on_exec_ctx),
94                      &rsp);
95     grpc_core::ExecCtx::Get()->Flush();
96     gpr_mu_lock(pr.mu);
97     while (!pr.done) {
98       grpc_pollset_worker* worker = nullptr;
99       if (!GRPC_LOG_IF_ERROR(
100               "pollset_work",
101               grpc_pollset_work(
102                   grpc_polling_entity_pollset(&pr.pops), &worker,
103                   grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
104         pr.done = 1;
105       }
106     }
107     gpr_mu_unlock(pr.mu);
108 
109     grpc_httpcli_context_destroy(&context);
110     grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
111                           shutdown_closure);
112 
113     gpr_free(path);
114     grpc_http_response_destroy(&rsp);
115   }
116   grpc_shutdown();
117 }
118 
119 typedef struct portreq {
120   gpr_mu* mu = nullptr;
121   grpc_polling_entity pops = {};
122   int port = 0;
123   int retries = 0;
124   char* server = nullptr;
125   grpc_httpcli_context* ctx = nullptr;
126   grpc_httpcli_response response = {};
127 } portreq;
128 
got_port_from_server(void * arg,grpc_error_handle error)129 static void got_port_from_server(void* arg, grpc_error_handle error) {
130   size_t i;
131   int port = 0;
132   portreq* pr = static_cast<portreq*>(arg);
133   int failed = 0;
134   grpc_httpcli_response* response = &pr->response;
135 
136   if (error != GRPC_ERROR_NONE) {
137     failed = 1;
138     gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]",
139             grpc_error_std_string(error).c_str());
140   } else if (response->status != 200) {
141     failed = 1;
142     gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
143             response->status);
144   }
145 
146   if (failed) {
147     grpc_httpcli_request req;
148     memset(&req, 0, sizeof(req));
149     if (pr->retries >= 5) {
150       gpr_mu_lock(pr->mu);
151       pr->port = 0;
152       GRPC_LOG_IF_ERROR(
153           "pollset_kick",
154           grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
155       gpr_mu_unlock(pr->mu);
156       return;
157     }
158     GPR_ASSERT(pr->retries < 10);
159     gpr_sleep_until(gpr_time_add(
160         gpr_now(GPR_CLOCK_REALTIME),
161         gpr_time_from_millis(
162             static_cast<int64_t>(
163                 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
164             GPR_TIMESPAN)));
165     pr->retries++;
166     req.host = pr->server;
167     req.http.path = const_cast<char*>("/get");
168     grpc_http_response_destroy(&pr->response);
169     pr->response = {};
170     grpc_resource_quota* resource_quota =
171         grpc_resource_quota_create("port_server_client/pick_retry");
172     grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
173                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
174                      GRPC_CLOSURE_CREATE(got_port_from_server, pr,
175                                          grpc_schedule_on_exec_ctx),
176                      &pr->response);
177     return;
178   }
179   GPR_ASSERT(response);
180   GPR_ASSERT(response->status == 200);
181   for (i = 0; i < response->body_length; i++) {
182     GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
183     port = port * 10 + response->body[i] - '0';
184   }
185   GPR_ASSERT(port > 1024);
186   gpr_mu_lock(pr->mu);
187   pr->port = port;
188   GRPC_LOG_IF_ERROR(
189       "pollset_kick",
190       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
191   gpr_mu_unlock(pr->mu);
192 }
193 
grpc_pick_port_using_server(void)194 int grpc_pick_port_using_server(void) {
195   grpc_httpcli_context context;
196   grpc_httpcli_request req;
197   portreq pr;
198   grpc_closure* shutdown_closure;
199 
200   grpc_init();
201   {
202     grpc_core::ExecCtx exec_ctx;
203     pr = {};
204     memset(&req, 0, sizeof(req));
205     grpc_pollset* pollset =
206         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
207     grpc_pollset_init(pollset, &pr.mu);
208     pr.pops = grpc_polling_entity_create_from_pollset(pollset);
209     shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
210                                            grpc_schedule_on_exec_ctx);
211     pr.port = -1;
212     pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
213     pr.ctx = &context;
214 
215     req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
216     req.http.path = const_cast<char*>("/get");
217 
218     grpc_httpcli_context_init(&context);
219     grpc_resource_quota* resource_quota =
220         grpc_resource_quota_create("port_server_client/pick");
221     grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
222                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
223                      GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
224                                          grpc_schedule_on_exec_ctx),
225                      &pr.response);
226     grpc_core::ExecCtx::Get()->Flush();
227     gpr_mu_lock(pr.mu);
228     while (pr.port == -1) {
229       grpc_pollset_worker* worker = nullptr;
230       if (!GRPC_LOG_IF_ERROR(
231               "pollset_work",
232               grpc_pollset_work(
233                   grpc_polling_entity_pollset(&pr.pops), &worker,
234                   grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
235         pr.port = 0;
236       }
237     }
238     gpr_mu_unlock(pr.mu);
239 
240     grpc_http_response_destroy(&pr.response);
241     grpc_httpcli_context_destroy(&context);
242     grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
243                           shutdown_closure);
244 
245     grpc_core::ExecCtx::Get()->Flush();
246   }
247   grpc_shutdown();
248 
249   return pr.port;
250 }
251 
252 #endif  // GRPC_TEST_PICK_PORT
253