1 #ifndef BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601 2 #define BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601 3 4 // Copyright 2010 (C) Dean Michael Berris 5 // Copyright 2010 (C) Sinefunc, Inc. 6 // Copyright 2011 Dean Michael Berris (dberris@google.com). 7 // Copyright 2011 Google,Inc. 8 // Distributed under the Boost Software License, Version 1.0. 9 // (See accompanying file LICENSE_1_0.txt or copy at 10 // http://www.boost.org/LICENSE_1_0.txt) 11 12 #include <iterator> 13 #include <cstdint> 14 #include <boost/algorithm/string/trim.hpp> 15 #include <boost/asio/steady_timer.hpp> 16 #include <boost/asio/placeholders.hpp> 17 #include <boost/asio/strand.hpp> 18 #include <boost/asio/streambuf.hpp> 19 #include <boost/assert.hpp> 20 #include <boost/logic/tribool.hpp> 21 #include <boost/network/constants.hpp> 22 #include <boost/network/detail/debug.hpp> 23 #include <boost/network/protocol/http/algorithms/linearize.hpp> 24 #include <boost/network/protocol/http/client/connection/async_protocol_handler.hpp> 25 #include <boost/network/protocol/http/message/wrappers/host.hpp> 26 #include <boost/network/protocol/http/message/wrappers/uri.hpp> 27 #include <boost/network/protocol/http/parser/incremental.hpp> 28 #include <boost/network/protocol/http/traits/delegate_factory.hpp> 29 #include <boost/network/traits/istream.hpp> 30 #include <boost/network/traits/ostream_iterator.hpp> 31 #include <boost/network/version.hpp> 32 #include <boost/range/algorithm/transform.hpp> 33 #include <boost/range/iterator_range.hpp> 34 #include <boost/throw_exception.hpp> 35 36 namespace boost { 37 namespace network { 38 namespace http { 39 namespace impl { 40 41 template <class buffer_type> 42 struct chunk_encoding_parser { 43 typedef typename buffer_type::const_iterator const_iterator; 44 typedef boost::iterator_range<const_iterator> char_const_range; 45 chunk_encoding_parserboost::network::http::impl::chunk_encoding_parser46 chunk_encoding_parser() : state(state_t::header), chunk_size(0) {} 47 48 enum class state_t { header, header_end, data, data_end }; 49 50 state_t state; 51 size_t chunk_size; 52 buffer_type buffer; 53 54 template<typename T> update_chunk_sizeboost::network::http::impl::chunk_encoding_parser55 void update_chunk_size(boost::iterator_range<T> const &range) { 56 if (range.empty()) return; 57 std::stringstream ss; 58 ss << std::hex << range; 59 size_t size; 60 ss >> size; 61 // New digits are appended as LSBs 62 chunk_size = (chunk_size << (range.size() * 4)) | size; 63 } 64 65 template<typename T> operator ()boost::network::http::impl::chunk_encoding_parser66 char_const_range operator()(boost::iterator_range<T> const &range) { 67 auto iter = boost::begin(range); 68 auto begin = iter; 69 auto pos = boost::begin(buffer); 70 71 while (iter != boost::end(range)) switch (state) { 72 case state_t::header: 73 iter = std::find(iter, boost::end(range), '\r'); 74 update_chunk_size(boost::make_iterator_range(begin, iter)); 75 if (iter != boost::end(range)) { 76 state = state_t::header_end; 77 ++iter; 78 } 79 break; 80 81 case state_t::header_end: 82 BOOST_ASSERT(*iter == '\n'); 83 ++iter; 84 state = state_t::data; 85 break; 86 87 case state_t::data: 88 if (chunk_size == 0) { 89 BOOST_ASSERT(*iter == '\r'); 90 ++iter; 91 state = state_t::data_end; 92 } else { 93 auto len = std::min(chunk_size, 94 (size_t)std::distance(iter, boost::end(range))); 95 begin = iter; 96 iter = std::next(iter, len); 97 pos = std::copy(begin, iter, pos); 98 chunk_size -= len; 99 } 100 break; 101 102 case state_t::data_end: 103 BOOST_ASSERT(*iter == '\n'); 104 ++iter; 105 begin = iter; 106 state = state_t::header; 107 break; 108 109 default: 110 BOOST_ASSERT(false && "Bug, report this to the developers!"); 111 } 112 return boost::make_iterator_range(boost::begin(buffer), pos); 113 } 114 }; 115 116 template <class Tag, unsigned version_major, unsigned version_minor> 117 struct async_connection_base; 118 119 namespace placeholders = boost::asio::placeholders; 120 121 template <class Tag, unsigned version_major, unsigned version_minor> 122 struct http_async_connection 123 : async_connection_base<Tag, version_major, version_minor>, 124 protected http_async_protocol_handler<Tag, version_major, version_minor>, 125 std::enable_shared_from_this< 126 http_async_connection<Tag, version_major, version_minor> > { 127 http_async_connection(http_async_connection const&) = delete; 128 129 typedef async_connection_base<Tag, version_major, version_minor> base; 130 typedef http_async_protocol_handler<Tag, version_major, version_minor> 131 protocol_base; 132 typedef typename base::resolver_type resolver_type; 133 typedef typename base::resolver_base::resolver_iterator resolver_iterator; 134 typedef typename base::resolver_base::resolver_iterator_pair 135 resolver_iterator_pair; 136 typedef typename base::response response; 137 typedef typename base::string_type string_type; 138 typedef typename base::request request; 139 typedef typename base::resolver_base::resolve_function resolve_function; 140 typedef typename base::char_const_range char_const_range; 141 typedef 142 typename base::body_callback_function_type body_callback_function_type; 143 typedef 144 typename base::body_generator_function_type body_generator_function_type; 145 typedef http_async_connection<Tag, version_major, version_minor> this_type; 146 typedef typename delegate_factory<Tag>::type delegate_factory_type; 147 typedef typename delegate_factory_type::connection_delegate_ptr 148 connection_delegate_ptr; 149 typedef chunk_encoding_parser<typename protocol_base::buffer_type> chunk_encoding_parser_type; 150 http_async_connectionboost::network::http::impl::http_async_connection151 http_async_connection(resolver_type& resolver, resolve_function resolve, 152 bool follow_redirect, int timeout, 153 bool remove_chunk_markers, 154 connection_delegate_ptr delegate) 155 : timeout_(timeout), 156 remove_chunk_markers_(remove_chunk_markers), 157 timer_(resolver.get_io_service()), 158 is_timedout_(false), 159 follow_redirect_(follow_redirect), 160 resolver_(resolver), 161 resolve_(std::move(resolve)), 162 request_strand_(resolver.get_io_service()), 163 delegate_(std::move(delegate)) {} 164 165 // This is the main entry point for the connection/request pipeline. 166 // We're 167 // overriding async_connection_base<...>::start(...) here which is 168 // called 169 // by the client. startboost::network::http::impl::http_async_connection170 virtual response start(request const& request, string_type const& method, 171 bool get_body, body_callback_function_type callback, 172 body_generator_function_type generator) { 173 response response_; 174 this->init_response(response_, get_body); 175 linearize(request, method, version_major, version_minor, 176 std::ostreambuf_iterator<typename char_<Tag>::type>( 177 &command_streambuf)); 178 this->method = method; 179 std::uint16_t port_ = port(request); 180 string_type host_ = host(request); 181 std::uint16_t source_port = request.source_port(); 182 183 auto sni_hostname = request.sni_hostname(); 184 185 auto self = this->shared_from_this(); 186 resolve_(resolver_, host_, port_, 187 request_strand_.wrap( 188 [=] (boost::system::error_code const &ec, 189 resolver_iterator_pair endpoint_range) { 190 self->handle_resolved(host_, port_, source_port, sni_hostname, get_body, 191 callback, generator, ec, endpoint_range); 192 })); 193 if (timeout_ > 0) { 194 timer_.expires_from_now(std::chrono::seconds(timeout_)); 195 timer_.async_wait(request_strand_.wrap([=] (boost::system::error_code const &ec) { 196 self->handle_timeout(ec); 197 })); 198 } 199 return response_; 200 } 201 202 private: set_errorsboost::network::http::impl::http_async_connection203 void set_errors(boost::system::error_code const& ec, body_callback_function_type callback) { 204 boost::system::system_error error(ec); 205 this->version_promise.set_exception(error); 206 this->status_promise.set_exception(error); 207 this->status_message_promise.set_exception(error); 208 this->headers_promise.set_exception(error); 209 this->source_promise.set_exception(error); 210 this->destination_promise.set_exception(error); 211 this->body_promise.set_exception(error); 212 if ( callback ) 213 callback( char_const_range(), ec ); 214 this->timer_.cancel(); 215 } 216 handle_timeoutboost::network::http::impl::http_async_connection217 void handle_timeout(boost::system::error_code const& ec) { 218 if (!ec) delegate_->disconnect(); 219 is_timedout_ = true; 220 } 221 handle_resolvedboost::network::http::impl::http_async_connection222 void handle_resolved(string_type host, std::uint16_t port, 223 std::uint16_t source_port, optional<string_type> sni_hostname, bool get_body, 224 body_callback_function_type callback, 225 body_generator_function_type generator, 226 boost::system::error_code const& ec, 227 resolver_iterator_pair endpoint_range) { 228 if (!ec && !boost::empty(endpoint_range)) { 229 // Here we deal with the case that there was an error encountered and 230 // that there's still more endpoints to try connecting to. 231 resolver_iterator iter = boost::begin(endpoint_range); 232 boost::asio::ip::tcp::endpoint endpoint(iter->endpoint().address(), port); 233 auto self = this->shared_from_this(); 234 delegate_->connect( 235 endpoint, host, source_port, sni_hostname, 236 request_strand_.wrap([=] (boost::system::error_code const &ec) { 237 auto iter_copy = iter; 238 self->handle_connected(host, port, source_port, sni_hostname, get_body, callback, 239 generator, std::make_pair(++iter_copy, resolver_iterator()), ec); 240 })); 241 } else { 242 set_errors((ec ? ec : boost::asio::error::host_not_found), callback); 243 } 244 } 245 handle_connectedboost::network::http::impl::http_async_connection246 void handle_connected(string_type host, std::uint16_t port, 247 std::uint16_t source_port, optional<string_type> sni_hostname, bool get_body, 248 body_callback_function_type callback, 249 body_generator_function_type generator, 250 resolver_iterator_pair endpoint_range, 251 boost::system::error_code const& ec) { 252 if (is_timedout_) { 253 set_errors(boost::asio::error::timed_out, callback); 254 } else if (!ec) { 255 BOOST_ASSERT(delegate_.get() != 0); 256 auto self = this->shared_from_this(); 257 delegate_->write( 258 command_streambuf, 259 request_strand_.wrap([=] (boost::system::error_code const &ec, 260 std::size_t bytes_transferred) { 261 self->handle_sent_request(get_body, callback, generator, 262 ec, bytes_transferred); 263 })); 264 } else { 265 if (!boost::empty(endpoint_range)) { 266 resolver_iterator iter = boost::begin(endpoint_range); 267 boost::asio::ip::tcp::endpoint endpoint(iter->endpoint().address(), port); 268 auto self = this->shared_from_this(); 269 delegate_->connect( 270 endpoint, host, source_port, sni_hostname, 271 request_strand_.wrap([=] (boost::system::error_code const &ec) { 272 auto iter_copy = iter; 273 self->handle_connected(host, port, source_port, sni_hostname, get_body, callback, 274 generator, std::make_pair(++iter_copy, resolver_iterator()), 275 ec); 276 })); 277 } else { 278 set_errors((ec ? ec : boost::asio::error::host_not_found), callback); 279 } 280 } 281 } 282 283 enum state_t { version, status, status_message, headers, body }; 284 handle_sent_requestboost::network::http::impl::http_async_connection285 void handle_sent_request(bool get_body, body_callback_function_type callback, 286 body_generator_function_type generator, 287 boost::system::error_code const& ec, 288 std::size_t /*bytes_transferred*/) { // TODO(unassigned): use-case? 289 if (!is_timedout_ && !ec) { 290 if (generator) { 291 // Here we write some more data that the generator provides, before we 292 // wait for data from the server. 293 string_type chunk; 294 if (generator(chunk)) { 295 // At this point this means we have more data to write, so we write 296 // it out. 297 std::copy(chunk.begin(), chunk.end(), 298 std::ostreambuf_iterator<typename char_<Tag>::type>( 299 &command_streambuf)); 300 auto self = this->shared_from_this(); 301 delegate_->write( 302 command_streambuf, 303 request_strand_.wrap([=] (boost::system::error_code const &ec, 304 std::size_t bytes_transferred) { 305 self->handle_sent_request(get_body, callback, generator, 306 ec, bytes_transferred); 307 })); 308 return; 309 } 310 } 311 312 auto self = this->shared_from_this(); 313 delegate_->read_some( 314 boost::asio::mutable_buffers_1(this->part.data(), 315 this->part.size()), 316 request_strand_.wrap([=] (boost::system::error_code const &ec, 317 std::size_t bytes_transferred) { 318 self->handle_received_data(version, get_body, callback, 319 ec, bytes_transferred); 320 })); 321 } else { 322 set_errors((is_timedout_ ? boost::asio::error::timed_out : ec), callback); 323 } 324 } 325 handle_received_databoost::network::http::impl::http_async_connection326 void handle_received_data(state_t state, bool get_body, 327 body_callback_function_type callback, 328 boost::system::error_code const& ec, 329 std::size_t bytes_transferred) { 330 static const long short_read_error = 335544539; 331 bool is_ssl_short_read_error = 332 #ifdef BOOST_NETWORK_ENABLE_HTTPS 333 ec.category() == boost::asio::error::ssl_category && 334 ec.value() == short_read_error; 335 #else 336 false && short_read_error; 337 #endif 338 if (!is_timedout_ && 339 (!ec || ec == boost::asio::error::eof || is_ssl_short_read_error)) { 340 logic::tribool parsed_ok; 341 size_t remainder; 342 auto self = this->shared_from_this(); 343 switch (state) { 344 case version: 345 if (ec == boost::asio::error::eof) return; 346 parsed_ok = this->parse_version( 347 delegate_, 348 request_strand_.wrap([=] (boost::system::error_code const &ec, 349 std::size_t bytes_transferred) { 350 self->handle_received_data(version, get_body, callback, 351 ec, bytes_transferred); 352 }), 353 bytes_transferred); 354 if (!parsed_ok || indeterminate(parsed_ok)) { 355 return; 356 } 357 // fall-through 358 case status: 359 if (ec == boost::asio::error::eof) return; 360 parsed_ok = this->parse_status( 361 delegate_, 362 request_strand_.wrap([=] (boost::system::error_code const &ec, 363 std::size_t bytes_transferred) { 364 self->handle_received_data(status, get_body, callback, 365 ec, bytes_transferred); 366 }), 367 bytes_transferred); 368 if (!parsed_ok || indeterminate(parsed_ok)) { 369 return; 370 } 371 // fall-through 372 case status_message: 373 if (ec == boost::asio::error::eof) return; 374 parsed_ok = this->parse_status_message( 375 delegate_, request_strand_.wrap([=] (boost::system::error_code const &, 376 std::size_t bytes_transferred) { 377 self->handle_received_data(status_message, get_body, callback, 378 ec, bytes_transferred); 379 }), 380 bytes_transferred); 381 if (!parsed_ok || indeterminate(parsed_ok)) { 382 return; 383 } 384 // fall-through 385 case headers: 386 if (ec == boost::asio::error::eof) return; 387 // In the following, remainder is the number of bytes that remain in 388 // the buffer. We need this in the body processing to make sure that 389 // the data remaining in the buffer is dealt with before another call 390 // to get more data for the body is scheduled. 391 std::tie(parsed_ok, remainder) = this->parse_headers( 392 delegate_, 393 request_strand_.wrap([=] (boost::system::error_code const &ec, 394 std::size_t bytes_transferred) { 395 self->handle_received_data(headers, get_body, callback, 396 ec, bytes_transferred); 397 }), 398 bytes_transferred); 399 400 if (!parsed_ok || indeterminate(parsed_ok)) { 401 return; 402 } 403 404 if (!get_body) { 405 // We short-circuit here because the user does not want to get the 406 // body (in the case of a HEAD request). 407 this->body_promise.set_value(""); 408 if ( callback ) 409 callback( char_const_range(), boost::asio::error::eof ); 410 this->destination_promise.set_value(""); 411 this->source_promise.set_value(""); 412 // this->part.assign('\0'); 413 boost::copy("\0", std::begin(this->part)); 414 this->response_parser_.reset(); 415 return; 416 } 417 418 if (callback) { 419 // Here we deal with the spill-over data from the headers 420 // processing. This means the headers data has already been parsed 421 // appropriately and we're looking to treat everything that remains 422 // in the buffer. 423 typename protocol_base::buffer_type::const_iterator begin = 424 this->part_begin; 425 typename protocol_base::buffer_type::const_iterator end = begin; 426 std::advance(end, remainder); 427 428 // We're setting the body promise here to an empty string because 429 // this can be used as a signaling mechanism for the user to 430 // determine that the body is now ready for processing, even though 431 // the callback is already provided. 432 this->body_promise.set_value(""); 433 434 // The invocation of the callback is synchronous to allow us to 435 // wait before scheduling another read. 436 if (this->is_chunk_encoding && remove_chunk_markers_) { 437 callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec); 438 } else { 439 callback(make_iterator_range(begin, end), ec); 440 } 441 auto self = this->shared_from_this(); 442 delegate_->read_some( 443 boost::asio::mutable_buffers_1(this->part.data(), 444 this->part.size()), 445 request_strand_.wrap([=] (boost::system::error_code const &ec, 446 std::size_t bytes_transferred) { 447 self->handle_received_data(body, get_body, callback, 448 ec, bytes_transferred); 449 })); 450 } else { 451 // Here we handle the body data ourself and append to an 452 // ever-growing string buffer. 453 auto self = this->shared_from_this(); 454 this->parse_body( 455 delegate_, 456 request_strand_.wrap([=] (boost::system::error_code const &ec, 457 std::size_t bytes_transferred) { 458 self->handle_received_data(body, get_body, callback, 459 ec, bytes_transferred); 460 }), 461 remainder); 462 } 463 return; 464 case body: 465 if (ec == boost::asio::error::eof || is_ssl_short_read_error) { 466 // Here we're handling the case when the connection has been closed 467 // from the server side, or at least that the end of file has been 468 // reached while reading the socket. This signals the end of the 469 // body processing chain. 470 if (callback) { 471 typename protocol_base::buffer_type::const_iterator 472 begin = this->part.begin(), 473 end = begin; 474 std::advance(end, bytes_transferred); 475 476 // We call the callback function synchronously passing the error 477 // condition (in this case, end of file) so that it can handle it 478 // appropriately. 479 if (this->is_chunk_encoding && remove_chunk_markers_) { 480 callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec); 481 } else { 482 callback(make_iterator_range(begin, end), ec); 483 } 484 } else { 485 string_type body_string; 486 if (this->is_chunk_encoding && remove_chunk_markers_) { 487 const auto parse_buffer_size = parse_chunk_encoding.buffer.size(); 488 for (size_t i = 0; i < this->partial_parsed.size(); i += parse_buffer_size) { 489 auto range = parse_chunk_encoding(boost::make_iterator_range( 490 this->partial_parsed.cbegin() + i, 491 this->partial_parsed.cbegin() + 492 std::min(i + parse_buffer_size, this->partial_parsed.size()))); 493 body_string.append(boost::begin(range), boost::end(range)); 494 } 495 this->partial_parsed.clear(); 496 auto range = parse_chunk_encoding(boost::make_iterator_range( 497 this->part.begin(), 498 this->part.begin() + bytes_transferred)); 499 body_string.append(boost::begin(range), boost::end(range)); 500 this->body_promise.set_value(body_string); 501 } else { 502 std::swap(body_string, this->partial_parsed); 503 body_string.append(this->part.begin(), 504 this->part.begin() + bytes_transferred); 505 this->body_promise.set_value(body_string); 506 } 507 } 508 // TODO(dberris): set the destination value somewhere! 509 this->destination_promise.set_value(""); 510 this->source_promise.set_value(""); 511 // this->part.assign('\0'); 512 boost::copy("\0", std::begin(this->part)); 513 this->response_parser_.reset(); 514 this->timer_.cancel(); 515 } else { 516 // This means the connection has not been closed yet and we want to 517 // get more data. 518 if (callback) { 519 // Here we have a body_handler callback. Let's invoke the 520 // callback from here and make sure we're getting more data right 521 // after. 522 typename protocol_base::buffer_type::const_iterator begin = 523 this->part.begin(); 524 typename protocol_base::buffer_type::const_iterator end = begin; 525 std::advance(end, bytes_transferred); 526 if (this->is_chunk_encoding && remove_chunk_markers_) { 527 callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec); 528 } else { 529 callback(make_iterator_range(begin, end), ec); 530 } 531 auto self = this->shared_from_this(); 532 delegate_->read_some( 533 boost::asio::mutable_buffers_1(this->part.data(), 534 this->part.size()), 535 request_strand_.wrap([=] (boost::system::error_code const &ec, 536 std::size_t bytes_transferred) { 537 self->handle_received_data(body, get_body, callback, 538 ec, bytes_transferred); 539 })); 540 } else { 541 // Here we don't have a body callback. Let's make sure that we 542 // deal with the remainder from the headers part in case we do 543 // have data that's still in the buffer. 544 this->parse_body( 545 delegate_, 546 request_strand_.wrap([=] (boost::system::error_code const &ec, 547 std::size_t bytes_transferred) { 548 self->handle_received_data(body, get_body, callback, 549 ec, bytes_transferred); 550 }), 551 bytes_transferred); 552 } 553 } 554 return; 555 default: 556 BOOST_ASSERT(false && "Bug, report this to the developers!"); 557 } 558 } else { 559 boost::system::error_code report_code = is_timedout_ ? boost::asio::error::timed_out : ec; 560 boost::system::system_error error(report_code); 561 this->source_promise.set_exception(error); 562 this->destination_promise.set_exception(error); 563 switch (state) { 564 case version: 565 this->version_promise.set_exception(error); 566 // fall-through 567 case status: 568 this->status_promise.set_exception(error); 569 // fall-through 570 case status_message: 571 this->status_message_promise.set_exception(error); 572 // fall-through 573 case headers: 574 this->headers_promise.set_exception(error); 575 // fall-through 576 case body: 577 if (!callback) { 578 // N.B. if callback is non-null, then body_promise has already been 579 // set to value "" to indicate body is handled by streaming handler 580 // so no exception should be set 581 this->body_promise.set_exception(error); 582 } 583 else 584 callback( char_const_range(), report_code ); 585 break; 586 // fall-through 587 default: 588 BOOST_ASSERT(false && "Bug, report this to the developers!"); 589 } 590 } 591 } 592 593 int timeout_; 594 bool remove_chunk_markers_; 595 boost::asio::steady_timer timer_; 596 bool is_timedout_; 597 bool follow_redirect_; 598 resolver_type& resolver_; 599 resolve_function resolve_; 600 boost::asio::io_service::strand request_strand_; 601 connection_delegate_ptr delegate_; 602 boost::asio::streambuf command_streambuf; 603 string_type method; 604 chunk_encoding_parser_type parse_chunk_encoding; 605 }; 606 607 } // namespace impl 608 } // namespace http 609 } // namespace network 610 } // namespace boost 611 612 #endif // BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601 613