1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/http/httpcli.h"
22
23 #include <string.h>
24
25 #include <string>
26
27 #include "absl/strings/str_format.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/string_util.h>
34
35 #include "src/core/lib/address_utils/sockaddr_utils.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gpr/string.h"
38 #include "src/core/lib/gprpp/memory.h"
39 #include "src/core/lib/http/format_request.h"
40 #include "src/core/lib/http/parser.h"
41 #include "src/core/lib/iomgr/endpoint.h"
42 #include "src/core/lib/iomgr/iomgr_internal.h"
43 #include "src/core/lib/iomgr/resolve_address.h"
44 #include "src/core/lib/iomgr/tcp_client.h"
45 #include "src/core/lib/resource_quota/api.h"
46 #include "src/core/lib/slice/slice_internal.h"
47
48 namespace grpc_core {
49 namespace {
50
51 class InternalRequest {
52 public:
InternalRequest(const grpc_slice & request_text,grpc_httpcli_response * response,ResourceQuotaRefPtr resource_quota,absl::string_view host,absl::string_view ssl_host_override,grpc_millis deadline,const grpc_httpcli_handshaker * handshaker,grpc_closure * on_done,grpc_httpcli_context * context,grpc_polling_entity * pollent,const char * name)53 InternalRequest(const grpc_slice& request_text,
54 grpc_httpcli_response* response,
55 ResourceQuotaRefPtr resource_quota, absl::string_view host,
56 absl::string_view ssl_host_override, grpc_millis deadline,
57 const grpc_httpcli_handshaker* handshaker,
58 grpc_closure* on_done, grpc_httpcli_context* context,
59 grpc_polling_entity* pollent, const char* name)
60 : request_text_(request_text),
61 resource_quota_(std::move(resource_quota)),
62 host_(host),
63 ssl_host_override_(ssl_host_override),
64 deadline_(deadline),
65 handshaker_(handshaker),
66 on_done_(on_done),
67 context_(context),
68 pollent_(pollent) {
69 grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
70 grpc_slice_buffer_init(&incoming_);
71 grpc_slice_buffer_init(&outgoing_);
72 grpc_iomgr_register_object(&iomgr_obj_, name);
73
74 GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
75 GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
76 GPR_ASSERT(pollent);
77 grpc_polling_entity_add_to_pollset_set(pollent_, context->pollset_set);
78 grpc_resolve_address(
79 host_.c_str(), handshaker_->default_port, context_->pollset_set,
80 GRPC_CLOSURE_CREATE(OnResolved, this, grpc_schedule_on_exec_ctx),
81 &addresses_);
82 }
83
~InternalRequest()84 ~InternalRequest() {
85 grpc_http_parser_destroy(&parser_);
86 if (addresses_ != nullptr) {
87 grpc_resolved_addresses_destroy(addresses_);
88 }
89 if (ep_ != nullptr) {
90 grpc_endpoint_destroy(ep_);
91 }
92 grpc_slice_unref_internal(request_text_);
93 grpc_iomgr_unregister_object(&iomgr_obj_);
94 grpc_slice_buffer_destroy_internal(&incoming_);
95 grpc_slice_buffer_destroy_internal(&outgoing_);
96 GRPC_ERROR_UNREF(overall_error_);
97 }
98
99 private:
Finish(grpc_error_handle error)100 void Finish(grpc_error_handle error) {
101 grpc_polling_entity_del_from_pollset_set(pollent_, context_->pollset_set);
102 ExecCtx::Run(DEBUG_LOCATION, on_done_, error);
103 delete this;
104 }
105
AppendError(grpc_error_handle error)106 void AppendError(grpc_error_handle error) {
107 if (overall_error_ == GRPC_ERROR_NONE) {
108 overall_error_ =
109 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request");
110 }
111 grpc_resolved_address* addr = &addresses_->addrs[next_address_ - 1];
112 std::string addr_text = grpc_sockaddr_to_uri(addr);
113 overall_error_ = grpc_error_add_child(
114 overall_error_,
115 grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text));
116 }
117
DoRead()118 void DoRead() {
119 grpc_endpoint_read(ep_, &incoming_, &on_read_, /*urgent=*/true);
120 }
121
OnRead(void * user_data,grpc_error_handle error)122 static void OnRead(void* user_data, grpc_error_handle error) {
123 InternalRequest* req = static_cast<InternalRequest*>(user_data);
124 req->OnReadInternal(error);
125 }
126
OnReadInternal(grpc_error_handle error)127 void OnReadInternal(grpc_error_handle error) {
128 size_t i;
129
130 for (i = 0; i < incoming_.count; i++) {
131 if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
132 have_read_byte_ = 1;
133 grpc_error_handle err =
134 grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
135 if (err != GRPC_ERROR_NONE) {
136 Finish(err);
137 return;
138 }
139 }
140 }
141
142 if (error == GRPC_ERROR_NONE) {
143 DoRead();
144 } else if (!have_read_byte_) {
145 NextAddress(GRPC_ERROR_REF(error));
146 } else {
147 Finish(grpc_http_parser_eof(&parser_));
148 }
149 }
150
OnWritten()151 void OnWritten() { DoRead(); }
152
DoneWrite(void * arg,grpc_error_handle error)153 static void DoneWrite(void* arg, grpc_error_handle error) {
154 InternalRequest* req = static_cast<InternalRequest*>(arg);
155 if (error == GRPC_ERROR_NONE) {
156 req->OnWritten();
157 } else {
158 req->NextAddress(GRPC_ERROR_REF(error));
159 }
160 }
161
StartWrite()162 void StartWrite() {
163 grpc_slice_ref_internal(request_text_);
164 grpc_slice_buffer_add(&outgoing_, request_text_);
165 grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr);
166 }
167
OnHandshakeDone(void * arg,grpc_endpoint * ep)168 static void OnHandshakeDone(void* arg, grpc_endpoint* ep) {
169 InternalRequest* req = static_cast<InternalRequest*>(arg);
170
171 if (!ep) {
172 req->NextAddress(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
173 "Unexplained handshake failure"));
174 return;
175 }
176
177 req->ep_ = ep;
178 req->StartWrite();
179 }
180
OnConnected(void * arg,grpc_error_handle error)181 static void OnConnected(void* arg, grpc_error_handle error) {
182 InternalRequest* req = static_cast<InternalRequest*>(arg);
183
184 if (!req->ep_) {
185 req->NextAddress(GRPC_ERROR_REF(error));
186 return;
187 }
188 req->handshaker_->handshake(req, req->ep_,
189 req->ssl_host_override_.empty()
190 ? req->host_.c_str()
191 : req->ssl_host_override_.c_str(),
192 req->deadline_, OnHandshakeDone);
193 }
194
NextAddress(grpc_error_handle error)195 void NextAddress(grpc_error_handle error) {
196 grpc_resolved_address* addr;
197 if (error != GRPC_ERROR_NONE) {
198 AppendError(error);
199 }
200 if (next_address_ == addresses_->naddrs) {
201 Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
202 "Failed HTTP requests to all targets", &overall_error_, 1));
203 return;
204 }
205 addr = &addresses_->addrs[next_address_++];
206 GRPC_CLOSURE_INIT(&connected_, OnConnected, this,
207 grpc_schedule_on_exec_ctx);
208 grpc_arg rq_arg = grpc_channel_arg_pointer_create(
209 const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA), resource_quota_->c_ptr(),
210 grpc_resource_quota_arg_vtable());
211 grpc_channel_args channel_args{1, &rq_arg};
212 auto* args = CoreConfiguration::Get()
213 .channel_args_preconditioning()
214 .PreconditionChannelArgs(&channel_args);
215 grpc_tcp_client_connect(&connected_, &ep_, context_->pollset_set, args,
216 addr, deadline_);
217 grpc_channel_args_destroy(args);
218 }
219
OnResolved(void * arg,grpc_error_handle error)220 static void OnResolved(void* arg, grpc_error_handle error) {
221 InternalRequest* req = static_cast<InternalRequest*>(arg);
222 if (error != GRPC_ERROR_NONE) {
223 req->Finish(GRPC_ERROR_REF(error));
224 return;
225 }
226 req->next_address_ = 0;
227 req->NextAddress(GRPC_ERROR_NONE);
228 }
229
230 grpc_slice request_text_;
231 grpc_http_parser parser_;
232 grpc_resolved_addresses* addresses_ = nullptr;
233 size_t next_address_ = 0;
234 grpc_endpoint* ep_ = nullptr;
235 ResourceQuotaRefPtr resource_quota_;
236 std::string host_;
237 std::string ssl_host_override_;
238 grpc_millis deadline_;
239 int have_read_byte_ = 0;
240 const grpc_httpcli_handshaker* handshaker_;
241 grpc_closure* on_done_;
242 grpc_httpcli_context* context_;
243 grpc_polling_entity* pollent_;
244 grpc_iomgr_object iomgr_obj_;
245 grpc_slice_buffer incoming_;
246 grpc_slice_buffer outgoing_;
247 grpc_closure on_read_;
248 grpc_closure done_write_;
249 grpc_closure connected_;
250 grpc_error_handle overall_error_ = GRPC_ERROR_NONE;
251 };
252
253 } // namespace
254 } // namespace grpc_core
255
256 static grpc_httpcli_get_override g_get_override = nullptr;
257 static grpc_httpcli_post_override g_post_override = nullptr;
258
plaintext_handshake(void * arg,grpc_endpoint * endpoint,const char *,grpc_millis,void (* on_done)(void * arg,grpc_endpoint * endpoint))259 static void plaintext_handshake(void* arg, grpc_endpoint* endpoint,
260 const char* /*host*/, grpc_millis /*deadline*/,
261 void (*on_done)(void* arg,
262 grpc_endpoint* endpoint)) {
263 on_done(arg, endpoint);
264 }
265
266 const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
267 plaintext_handshake};
268
grpc_httpcli_context_init(grpc_httpcli_context * context)269 void grpc_httpcli_context_init(grpc_httpcli_context* context) {
270 context->pollset_set = grpc_pollset_set_create();
271 }
272
grpc_httpcli_context_destroy(grpc_httpcli_context * context)273 void grpc_httpcli_context_destroy(grpc_httpcli_context* context) {
274 grpc_pollset_set_destroy(context->pollset_set);
275 }
276
internal_request_begin(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response,const char * name,const grpc_slice & request_text)277 static void internal_request_begin(
278 grpc_httpcli_context* context, grpc_polling_entity* pollent,
279 grpc_core::ResourceQuotaRefPtr resource_quota,
280 const grpc_httpcli_request* request, grpc_millis deadline,
281 grpc_closure* on_done, grpc_httpcli_response* response, const char* name,
282 const grpc_slice& request_text) {
283 new grpc_core::InternalRequest(
284 request_text, response, std::move(resource_quota), request->host,
285 request->ssl_host_override, deadline,
286 request->handshaker ? request->handshaker : &grpc_httpcli_plaintext,
287 on_done, context, pollent, name);
288 }
289
grpc_httpcli_get(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response)290 void grpc_httpcli_get(grpc_httpcli_context* context,
291 grpc_polling_entity* pollent,
292 grpc_core::ResourceQuotaRefPtr resource_quota,
293 const grpc_httpcli_request* request, grpc_millis deadline,
294 grpc_closure* on_done, grpc_httpcli_response* response) {
295 if (g_get_override && g_get_override(request, deadline, on_done, response)) {
296 return;
297 }
298 std::string name =
299 absl::StrFormat("HTTP:GET:%s:%s", request->host, request->http.path);
300 internal_request_begin(context, pollent, std::move(resource_quota), request,
301 deadline, on_done, response, name.c_str(),
302 grpc_httpcli_format_get_request(request));
303 }
304
grpc_httpcli_post(grpc_httpcli_context * context,grpc_polling_entity * pollent,grpc_core::ResourceQuotaRefPtr resource_quota,const grpc_httpcli_request * request,const char * body_bytes,size_t body_size,grpc_millis deadline,grpc_closure * on_done,grpc_httpcli_response * response)305 void grpc_httpcli_post(grpc_httpcli_context* context,
306 grpc_polling_entity* pollent,
307 grpc_core::ResourceQuotaRefPtr resource_quota,
308 const grpc_httpcli_request* request,
309 const char* body_bytes, size_t body_size,
310 grpc_millis deadline, grpc_closure* on_done,
311 grpc_httpcli_response* response) {
312 if (g_post_override && g_post_override(request, body_bytes, body_size,
313 deadline, on_done, response)) {
314 return;
315 }
316 std::string name =
317 absl::StrFormat("HTTP:POST:%s:%s", request->host, request->http.path);
318 internal_request_begin(
319 context, pollent, std::move(resource_quota), request, deadline, on_done,
320 response, name.c_str(),
321 grpc_httpcli_format_post_request(request, body_bytes, body_size));
322 }
323
grpc_httpcli_set_override(grpc_httpcli_get_override get,grpc_httpcli_post_override post)324 void grpc_httpcli_set_override(grpc_httpcli_get_override get,
325 grpc_httpcli_post_override post) {
326 g_get_override = get;
327 g_post_override = post;
328 }
329