1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/http/httpcli.h"
22 
23 #include <string.h>
24 
25 #include <string>
26 
27 #include "absl/strings/str_format.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/string_util.h>
34 
35 #include "src/core/lib/address_utils/sockaddr_utils.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gpr/string.h"
38 #include "src/core/lib/gprpp/memory.h"
39 #include "src/core/lib/http/format_request.h"
40 #include "src/core/lib/http/parser.h"
41 #include "src/core/lib/iomgr/endpoint.h"
42 #include "src/core/lib/iomgr/iomgr_internal.h"
43 #include "src/core/lib/iomgr/resolve_address.h"
44 #include "src/core/lib/iomgr/tcp_client.h"
45 #include "src/core/lib/resource_quota/api.h"
46 #include "src/core/lib/slice/slice_internal.h"
47 
48 namespace grpc_core {
49 namespace {
50 
51 class InternalRequest {
52  public:
InternalRequest(const grpc_slice & request_text,grpc_httpcli_response * response,ResourceQuotaRefPtr resource_quota,absl::string_view host,absl::string_view ssl_host_override,grpc_millis deadline,const grpc_httpcli_handshaker * handshaker,grpc_closure * on_done,grpc_httpcli_context * context,grpc_polling_entity * pollent,const char * name)53   InternalRequest(const grpc_slice& request_text,
54                   grpc_httpcli_response* response,
55                   ResourceQuotaRefPtr resource_quota, absl::string_view host,
56                   absl::string_view ssl_host_override, grpc_millis deadline,
57                   const grpc_httpcli_handshaker* handshaker,
58                   grpc_closure* on_done, grpc_httpcli_context* context,
59                   grpc_polling_entity* pollent, const char* name)
60       : request_text_(request_text),
61         resource_quota_(std::move(resource_quota)),
62         host_(host),
63         ssl_host_override_(ssl_host_override),
64         deadline_(deadline),
65         handshaker_(handshaker),
66         on_done_(on_done),
67         context_(context),
68         pollent_(pollent) {
69     grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
70     grpc_slice_buffer_init(&incoming_);
71     grpc_slice_buffer_init(&outgoing_);
72     grpc_iomgr_register_object(&iomgr_obj_, name);
73 
74     GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
75     GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
76     GPR_ASSERT(pollent);
77     grpc_polling_entity_add_to_pollset_set(pollent_, context->pollset_set);
78     grpc_resolve_address(
79         host_.c_str(), handshaker_->default_port, context_->pollset_set,
80         GRPC_CLOSURE_CREATE(OnResolved, this, grpc_schedule_on_exec_ctx),
81         &addresses_);
82   }
83 
~InternalRequest()84   ~InternalRequest() {
85     grpc_http_parser_destroy(&parser_);
86     if (addresses_ != nullptr) {
87       grpc_resolved_addresses_destroy(addresses_);
88     }
89     if (ep_ != nullptr) {
90       grpc_endpoint_destroy(ep_);
91     }
92     grpc_slice_unref_internal(request_text_);
93     grpc_iomgr_unregister_object(&iomgr_obj_);
94     grpc_slice_buffer_destroy_internal(&incoming_);
95     grpc_slice_buffer_destroy_internal(&outgoing_);
96     GRPC_ERROR_UNREF(overall_error_);
97   }
98 
99  private:
Finish(grpc_error_handle error)100   void Finish(grpc_error_handle error) {
101     grpc_polling_entity_del_from_pollset_set(pollent_, context_->pollset_set);
102     ExecCtx::Run(DEBUG_LOCATION, on_done_, error);
103     delete this;
104   }
105 
AppendError(grpc_error_handle error)106   void AppendError(grpc_error_handle error) {
107     if (overall_error_ == GRPC_ERROR_NONE) {
108       overall_error_ =
109           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request");
110     }
111     grpc_resolved_address* addr = &addresses_->addrs[next_address_ - 1];
112     std::string addr_text = grpc_sockaddr_to_uri(addr);
113     overall_error_ = grpc_error_add_child(
114         overall_error_,
115         grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text));
116   }
117 
DoRead()118   void DoRead() {
119     grpc_endpoint_read(ep_, &incoming_, &on_read_, /*urgent=*/true);
120   }
121 
OnRead(void * user_data,grpc_error_handle error)122   static void OnRead(void* user_data, grpc_error_handle error) {
123     InternalRequest* req = static_cast<InternalRequest*>(user_data);
124     req->OnReadInternal(error);
125   }
126 
OnReadInternal(grpc_error_handle error)127   void OnReadInternal(grpc_error_handle error) {
128     size_t i;
129 
130     for (i = 0; i < incoming_.count; i++) {
131       if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
132         have_read_byte_ = 1;
133         grpc_error_handle err =
134             grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
135         if (err != GRPC_ERROR_NONE) {
136           Finish(err);
137           return;
138         }
139       }
140     }
141 
142     if (error == GRPC_ERROR_NONE) {
143       DoRead();
144     } else if (!have_read_byte_) {
145       NextAddress(GRPC_ERROR_REF(error));
146     } else {
147       Finish(grpc_http_parser_eof(&parser_));
148     }
149   }
150 
OnWritten()151   void OnWritten() { DoRead(); }
152 
DoneWrite(void * arg,grpc_error_handle error)153   static void DoneWrite(void* arg, grpc_error_handle error) {
154     InternalRequest* req = static_cast<InternalRequest*>(arg);
155     if (error == GRPC_ERROR_NONE) {
156       req->OnWritten();
157     } else {
158       req->NextAddress(GRPC_ERROR_REF(error));
159     }
160   }
161 
StartWrite()162   void StartWrite() {
163     grpc_slice_ref_internal(request_text_);
164     grpc_slice_buffer_add(&outgoing_, request_text_);
165     grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr);
166   }
167 
OnHandshakeDone(void * arg,grpc_endpoint * ep)168   static void OnHandshakeDone(void* arg, grpc_endpoint* ep) {
169     InternalRequest* req = static_cast<InternalRequest*>(arg);
170 
171     if (!ep) {
172       req->NextAddress(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
173           "Unexplained handshake failure"));
174       return;
175     }
176 
177     req->ep_ = ep;
178     req->StartWrite();
179   }
180 
OnConnected(void * arg,grpc_error_handle error)181   static void OnConnected(void* arg, grpc_error_handle error) {
182     InternalRequest* req = static_cast<InternalRequest*>(arg);
183 
184     if (!req->ep_) {
185       req->NextAddress(GRPC_ERROR_REF(error));
186       return;
187     }
188     req->handshaker_->handshake(req, req->ep_,
189                                 req->ssl_host_override_.empty()
190                                     ? req->host_.c_str()
191                                     : req->ssl_host_override_.c_str(),
192                                 req->deadline_, OnHandshakeDone);
193   }
194 
NextAddress(grpc_error_handle error)195   void NextAddress(grpc_error_handle error) {
196     grpc_resolved_address* addr;
197     if (error != GRPC_ERROR_NONE) {
198       AppendError(error);
199     }
200     if (next_address_ == addresses_->naddrs) {
201       Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
202           "Failed HTTP requests to all targets", &overall_error_, 1));
203       return;
204     }
205     addr = &addresses_->addrs[next_address_++];
206     GRPC_CLOSURE_INIT(&connected_, OnConnected, this,
207                       grpc_schedule_on_exec_ctx);
208     grpc_arg rq_arg = grpc_channel_arg_pointer_create(
209         const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA), resource_quota_->c_ptr(),
210         grpc_resource_quota_arg_vtable());
211     grpc_channel_args channel_args{1, &rq_arg};
212     auto* args = CoreConfiguration::Get()
213                      .channel_args_preconditioning()
214                      .PreconditionChannelArgs(&channel_args);
215     grpc_tcp_client_connect(&connected_, &ep_, context_->pollset_set, args,
216                             addr, deadline_);
217     grpc_channel_args_destroy(args);
218   }
219 
OnResolved(void * arg,grpc_error_handle error)220   static void OnResolved(void* arg, grpc_error_handle error) {
221     InternalRequest* req = static_cast<InternalRequest*>(arg);
222     if (error != GRPC_ERROR_NONE) {
223       req->Finish(GRPC_ERROR_REF(error));
224       return;
225     }
226     req->next_address_ = 0;
227     req->NextAddress(GRPC_ERROR_NONE);
228   }
229 
230   grpc_slice request_text_;
231   grpc_http_parser parser_;
232   grpc_resolved_addresses* addresses_ = nullptr;
233   size_t next_address_ = 0;
234   grpc_endpoint* ep_ = nullptr;
235   ResourceQuotaRefPtr resource_quota_;
236   std::string host_;
237   std::string ssl_host_override_;
238   grpc_millis deadline_;
239   int have_read_byte_ = 0;
240   const grpc_httpcli_handshaker* handshaker_;
241   grpc_closure* on_done_;
242   grpc_httpcli_context* context_;
243   grpc_polling_entity* pollent_;
244   grpc_iomgr_object iomgr_obj_;
245   grpc_slice_buffer incoming_;
246   grpc_slice_buffer outgoing_;
247   grpc_closure on_read_;
248   grpc_closure done_write_;
249   grpc_closure connected_;
250   grpc_error_handle overall_error_ = GRPC_ERROR_NONE;
251 };
252 
253 }  // namespace
254 }  // namespace grpc_core
255 
256 static grpc_httpcli_get_override g_get_override = nullptr;
257 static grpc_httpcli_post_override g_post_override = nullptr;
258 
plaintext_handshake(void * arg,grpc_endpoint * endpoint,const char *,grpc_millis,void (* on_done)(void * arg,grpc_endpoint * endpoint))259 static void plaintext_handshake(void* arg, grpc_endpoint* endpoint,
260                                 const char* /*host*/, grpc_millis /*deadline*/,
261                                 void (*on_done)(void* arg,
262                                                 grpc_endpoint* endpoint)) {
263   on_done(arg, endpoint);
264 }
265 
266 const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
267                                                         plaintext_handshake};
268 
grpc_httpcli_context_init(grpc_httpcli_context * context)269 void grpc_httpcli_context_init(grpc_httpcli_context* context) {
270   context->pollset_set = grpc_pollset_set_create();
271 }
272 
grpc_httpcli_context_destroy(grpc_httpcli_context * context)273 void grpc_httpcli_context_destroy(grpc_httpcli_context* context) {
274   grpc_pollset_set_destroy(context->pollset_set);
275 }
276 
internal_request_begin(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response,const char * name,const grpc_slice & request_text)277 static void internal_request_begin(
278     grpc_httpcli_context* context, grpc_polling_entity* pollent,
279     grpc_core::ResourceQuotaRefPtr resource_quota,
280     const grpc_httpcli_request* request, grpc_millis deadline,
281     grpc_closure* on_done, grpc_httpcli_response* response, const char* name,
282     const grpc_slice& request_text) {
283   new grpc_core::InternalRequest(
284       request_text, response, std::move(resource_quota), request->host,
285       request->ssl_host_override, deadline,
286       request->handshaker ? request->handshaker : &grpc_httpcli_plaintext,
287       on_done, context, pollent, name);
288 }
289 
grpc_httpcli_get(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response)290 void grpc_httpcli_get(grpc_httpcli_context* context,
291                       grpc_polling_entity* pollent,
292                       grpc_core::ResourceQuotaRefPtr resource_quota,
293                       const grpc_httpcli_request* request, grpc_millis deadline,
294                       grpc_closure* on_done, grpc_httpcli_response* response) {
295   if (g_get_override && g_get_override(request, deadline, on_done, response)) {
296     return;
297   }
298   std::string name =
299       absl::StrFormat("HTTP:GET:%s:%s", request->host, request->http.path);
300   internal_request_begin(context, pollent, std::move(resource_quota), request,
301                          deadline, on_done, response, name.c_str(),
302                          grpc_httpcli_format_get_request(request));
303 }
304 
grpc_httpcli_post(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,const char * body_bytes,size_t body_size,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response)305 void grpc_httpcli_post(grpc_httpcli_context* context,
306                        grpc_polling_entity* pollent,
307                        grpc_core::ResourceQuotaRefPtr resource_quota,
308                        const grpc_httpcli_request* request,
309                        const char* body_bytes, size_t body_size,
310                        grpc_millis deadline, grpc_closure* on_done,
311                        grpc_httpcli_response* response) {
312   if (g_post_override && g_post_override(request, body_bytes, body_size,
313                                          deadline, on_done, response)) {
314     return;
315   }
316   std::string name =
317       absl::StrFormat("HTTP:POST:%s:%s", request->host, request->http.path);
318   internal_request_begin(
319       context, pollent, std::move(resource_quota), request, deadline, on_done,
320       response, name.c_str(),
321       grpc_httpcli_format_post_request(request, body_bytes, body_size));
322 }
323 
grpc_httpcli_set_override(grpc_httpcli_get_override get,grpc_httpcli_post_override post)324 void grpc_httpcli_set_override(grpc_httpcli_get_override get,
325                                grpc_httpcli_post_override post) {
326   g_get_override = get;
327   g_post_override = post;
328 }
329