1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 #include "test/core/util/test_config.h"
21 
22 #ifdef GRPC_TEST_PICK_PORT
23 #include "test/core/util/port_server_client.h"
24 
25 #include <math.h>
26 #include <string.h>
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/lib/http/httpcli.h"
36 
37 typedef struct freereq {
38   gpr_mu* mu;
39   grpc_polling_entity pops;
40   int done;
41 } freereq;
42 
destroy_pops_and_shutdown(void * p,grpc_error * error)43 static void destroy_pops_and_shutdown(void* p, grpc_error* error) {
44   grpc_pollset* pollset =
45       grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
46   grpc_pollset_destroy(pollset);
47   gpr_free(pollset);
48 }
49 
freed_port_from_server(void * arg,grpc_error * error)50 static void freed_port_from_server(void* arg, grpc_error* error) {
51   freereq* pr = static_cast<freereq*>(arg);
52   gpr_mu_lock(pr->mu);
53   pr->done = 1;
54   GRPC_LOG_IF_ERROR(
55       "pollset_kick",
56       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
57   gpr_mu_unlock(pr->mu);
58 }
59 
grpc_free_port_using_server(int port)60 void grpc_free_port_using_server(int port) {
61   grpc_httpcli_context context;
62   grpc_httpcli_request req;
63   grpc_httpcli_response rsp;
64   freereq pr;
65   char* path;
66   grpc_core::ExecCtx exec_ctx;
67   grpc_closure* shutdown_closure;
68 
69   grpc_init();
70 
71   memset(&pr, 0, sizeof(pr));
72   memset(&req, 0, sizeof(req));
73   memset(&rsp, 0, sizeof(rsp));
74 
75   grpc_pollset* pollset =
76       static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
77   grpc_pollset_init(pollset, &pr.mu);
78   pr.pops = grpc_polling_entity_create_from_pollset(pollset);
79   shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
80                                          grpc_schedule_on_exec_ctx);
81 
82   req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
83   gpr_asprintf(&path, "/drop/%d", port);
84   req.http.path = path;
85 
86   grpc_httpcli_context_init(&context);
87   grpc_resource_quota* resource_quota =
88       grpc_resource_quota_create("port_server_client/free");
89   grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
90                    grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
91                    GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
92                                        grpc_schedule_on_exec_ctx),
93                    &rsp);
94   grpc_resource_quota_unref_internal(resource_quota);
95   grpc_core::ExecCtx::Get()->Flush();
96   gpr_mu_lock(pr.mu);
97   while (!pr.done) {
98     grpc_pollset_worker* worker = nullptr;
99     if (!GRPC_LOG_IF_ERROR(
100             "pollset_work",
101             grpc_pollset_work(
102                 grpc_polling_entity_pollset(&pr.pops), &worker,
103                 grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
104       pr.done = 1;
105     }
106   }
107   gpr_mu_unlock(pr.mu);
108 
109   grpc_httpcli_context_destroy(&context);
110   grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
111                         shutdown_closure);
112 
113   gpr_free(path);
114   grpc_http_response_destroy(&rsp);
115 
116   grpc_shutdown();
117 }
118 
119 typedef struct portreq {
120   gpr_mu* mu;
121   grpc_polling_entity pops;
122   int port;
123   int retries;
124   char* server;
125   grpc_httpcli_context* ctx;
126   grpc_httpcli_response response;
127 } portreq;
128 
got_port_from_server(void * arg,grpc_error * error)129 static void got_port_from_server(void* arg, grpc_error* error) {
130   size_t i;
131   int port = 0;
132   portreq* pr = static_cast<portreq*>(arg);
133   int failed = 0;
134   grpc_httpcli_response* response = &pr->response;
135 
136   if (error != GRPC_ERROR_NONE) {
137     failed = 1;
138     const char* msg = grpc_error_string(error);
139     gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]", msg);
140 
141   } else if (response->status != 200) {
142     failed = 1;
143     gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
144             response->status);
145   }
146 
147   if (failed) {
148     grpc_httpcli_request req;
149     memset(&req, 0, sizeof(req));
150     if (pr->retries >= 5) {
151       gpr_mu_lock(pr->mu);
152       pr->port = 0;
153       GRPC_LOG_IF_ERROR(
154           "pollset_kick",
155           grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
156       gpr_mu_unlock(pr->mu);
157       return;
158     }
159     GPR_ASSERT(pr->retries < 10);
160     gpr_sleep_until(gpr_time_add(
161         gpr_now(GPR_CLOCK_REALTIME),
162         gpr_time_from_millis(
163             static_cast<int64_t>(
164                 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
165             GPR_TIMESPAN)));
166     pr->retries++;
167     req.host = pr->server;
168     req.http.path = const_cast<char*>("/get");
169     grpc_http_response_destroy(&pr->response);
170     memset(&pr->response, 0, sizeof(pr->response));
171     grpc_resource_quota* resource_quota =
172         grpc_resource_quota_create("port_server_client/pick_retry");
173     grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
174                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
175                      GRPC_CLOSURE_CREATE(got_port_from_server, pr,
176                                          grpc_schedule_on_exec_ctx),
177                      &pr->response);
178     grpc_resource_quota_unref_internal(resource_quota);
179     return;
180   }
181   GPR_ASSERT(response);
182   GPR_ASSERT(response->status == 200);
183   for (i = 0; i < response->body_length; i++) {
184     GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
185     port = port * 10 + response->body[i] - '0';
186   }
187   GPR_ASSERT(port > 1024);
188   gpr_mu_lock(pr->mu);
189   pr->port = port;
190   GRPC_LOG_IF_ERROR(
191       "pollset_kick",
192       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
193   gpr_mu_unlock(pr->mu);
194 }
195 
grpc_pick_port_using_server(void)196 int grpc_pick_port_using_server(void) {
197   grpc_httpcli_context context;
198   grpc_httpcli_request req;
199   portreq pr;
200   grpc_closure* shutdown_closure;
201 
202   grpc_init();
203   {
204     grpc_core::ExecCtx exec_ctx;
205     memset(&pr, 0, sizeof(pr));
206     memset(&req, 0, sizeof(req));
207     grpc_pollset* pollset =
208         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
209     grpc_pollset_init(pollset, &pr.mu);
210     pr.pops = grpc_polling_entity_create_from_pollset(pollset);
211     shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
212                                            grpc_schedule_on_exec_ctx);
213     pr.port = -1;
214     pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
215     pr.ctx = &context;
216 
217     req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
218     req.http.path = const_cast<char*>("/get");
219 
220     grpc_httpcli_context_init(&context);
221     grpc_resource_quota* resource_quota =
222         grpc_resource_quota_create("port_server_client/pick");
223     grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
224                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
225                      GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
226                                          grpc_schedule_on_exec_ctx),
227                      &pr.response);
228     grpc_resource_quota_unref_internal(resource_quota);
229     grpc_core::ExecCtx::Get()->Flush();
230     gpr_mu_lock(pr.mu);
231     while (pr.port == -1) {
232       grpc_pollset_worker* worker = nullptr;
233       if (!GRPC_LOG_IF_ERROR(
234               "pollset_work",
235               grpc_pollset_work(
236                   grpc_polling_entity_pollset(&pr.pops), &worker,
237                   grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
238         pr.port = 0;
239       }
240     }
241     gpr_mu_unlock(pr.mu);
242 
243     grpc_http_response_destroy(&pr.response);
244     grpc_httpcli_context_destroy(&context);
245     grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
246                           shutdown_closure);
247 
248     grpc_core::ExecCtx::Get()->Flush();
249   }
250   grpc_shutdown();
251 
252   return pr.port;
253 }
254 
255 #endif  // GRPC_TEST_PICK_PORT
256