1 #ifndef BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601
2 #define BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601
3 
4 // Copyright 2010 (C) Dean Michael Berris
5 // Copyright 2010 (C) Sinefunc, Inc.
6 // Copyright 2011 Dean Michael Berris (dberris@google.com).
7 // Copyright 2011 Google,Inc.
8 // Distributed under the Boost Software License, Version 1.0.
9 // (See accompanying file LICENSE_1_0.txt or copy at
10 // http://www.boost.org/LICENSE_1_0.txt)
11 
12 #include <iterator>
13 #include <cstdint>
14 #include <boost/algorithm/string/trim.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 #include <boost/asio/placeholders.hpp>
17 #include <boost/asio/strand.hpp>
18 #include <boost/asio/streambuf.hpp>
19 #include <boost/assert.hpp>
20 #include <boost/logic/tribool.hpp>
21 #include <boost/network/constants.hpp>
22 #include <boost/network/detail/debug.hpp>
23 #include <boost/network/protocol/http/algorithms/linearize.hpp>
24 #include <boost/network/protocol/http/client/connection/async_protocol_handler.hpp>
25 #include <boost/network/protocol/http/message/wrappers/host.hpp>
26 #include <boost/network/protocol/http/message/wrappers/uri.hpp>
27 #include <boost/network/protocol/http/parser/incremental.hpp>
28 #include <boost/network/protocol/http/traits/delegate_factory.hpp>
29 #include <boost/network/traits/istream.hpp>
30 #include <boost/network/traits/ostream_iterator.hpp>
31 #include <boost/network/version.hpp>
32 #include <boost/range/algorithm/transform.hpp>
33 #include <boost/range/iterator_range.hpp>
34 #include <boost/throw_exception.hpp>
35 
36 namespace boost {
37 namespace network {
38 namespace http {
39 namespace impl {
40 
41 template <class buffer_type>
42 struct chunk_encoding_parser {
43   typedef typename buffer_type::const_iterator const_iterator;
44   typedef boost::iterator_range<const_iterator> char_const_range;
45 
chunk_encoding_parserboost::network::http::impl::chunk_encoding_parser46   chunk_encoding_parser() : state(state_t::header), chunk_size(0) {}
47 
48   enum class state_t { header, header_end, data, data_end };
49 
50   state_t state;
51   size_t chunk_size;
52   buffer_type buffer;
53 
54   template<typename T>
update_chunk_sizeboost::network::http::impl::chunk_encoding_parser55   void update_chunk_size(boost::iterator_range<T> const &range) {
56     if (range.empty()) return;
57     std::stringstream ss;
58     ss << std::hex << range;
59     size_t size;
60     ss >> size;
61     // New digits are appended as LSBs
62     chunk_size = (chunk_size << (range.size() * 4)) | size;
63   }
64 
65   template<typename T>
operator ()boost::network::http::impl::chunk_encoding_parser66   char_const_range operator()(boost::iterator_range<T> const &range) {
67     auto iter = boost::begin(range);
68     auto begin = iter;
69     auto pos = boost::begin(buffer);
70 
71     while (iter != boost::end(range)) switch (state) {
72         case state_t::header:
73           iter = std::find(iter, boost::end(range), '\r');
74           update_chunk_size(boost::make_iterator_range(begin, iter));
75           if (iter != boost::end(range)) {
76             state = state_t::header_end;
77             ++iter;
78           }
79           break;
80 
81         case state_t::header_end:
82           BOOST_ASSERT(*iter == '\n');
83           ++iter;
84           state = state_t::data;
85           break;
86 
87         case state_t::data:
88           if (chunk_size == 0) {
89             BOOST_ASSERT(*iter == '\r');
90             ++iter;
91             state = state_t::data_end;
92           } else {
93             auto len = std::min(chunk_size,
94                                 (size_t)std::distance(iter, boost::end(range)));
95             begin = iter;
96             iter = std::next(iter, len);
97             pos = std::copy(begin, iter, pos);
98             chunk_size -= len;
99           }
100           break;
101 
102         case state_t::data_end:
103           BOOST_ASSERT(*iter == '\n');
104           ++iter;
105           begin = iter;
106           state = state_t::header;
107           break;
108 
109         default:
110           BOOST_ASSERT(false && "Bug, report this to the developers!");
111       }
112     return boost::make_iterator_range(boost::begin(buffer), pos);
113   }
114 };
115 
116 template <class Tag, unsigned version_major, unsigned version_minor>
117 struct async_connection_base;
118 
119 namespace placeholders = boost::asio::placeholders;
120 
121 template <class Tag, unsigned version_major, unsigned version_minor>
122 struct http_async_connection
123     : async_connection_base<Tag, version_major, version_minor>,
124       protected http_async_protocol_handler<Tag, version_major, version_minor>,
125       std::enable_shared_from_this<
126           http_async_connection<Tag, version_major, version_minor> > {
127   http_async_connection(http_async_connection const&) = delete;
128 
129   typedef async_connection_base<Tag, version_major, version_minor> base;
130   typedef http_async_protocol_handler<Tag, version_major, version_minor>
131       protocol_base;
132   typedef typename base::resolver_type resolver_type;
133   typedef typename base::resolver_base::resolver_iterator resolver_iterator;
134   typedef typename base::resolver_base::resolver_iterator_pair
135       resolver_iterator_pair;
136   typedef typename base::response response;
137   typedef typename base::string_type string_type;
138   typedef typename base::request request;
139   typedef typename base::resolver_base::resolve_function resolve_function;
140   typedef typename base::char_const_range char_const_range;
141   typedef
142       typename base::body_callback_function_type body_callback_function_type;
143   typedef
144       typename base::body_generator_function_type body_generator_function_type;
145   typedef http_async_connection<Tag, version_major, version_minor> this_type;
146   typedef typename delegate_factory<Tag>::type delegate_factory_type;
147   typedef typename delegate_factory_type::connection_delegate_ptr
148       connection_delegate_ptr;
149   typedef chunk_encoding_parser<typename protocol_base::buffer_type> chunk_encoding_parser_type;
150 
http_async_connectionboost::network::http::impl::http_async_connection151   http_async_connection(resolver_type& resolver, resolve_function resolve,
152                         bool follow_redirect, int timeout,
153                         bool remove_chunk_markers,
154                         connection_delegate_ptr delegate)
155       : timeout_(timeout),
156         remove_chunk_markers_(remove_chunk_markers),
157         timer_(resolver.get_io_service()),
158         is_timedout_(false),
159         follow_redirect_(follow_redirect),
160         resolver_(resolver),
161         resolve_(std::move(resolve)),
162         request_strand_(resolver.get_io_service()),
163         delegate_(std::move(delegate)) {}
164 
165   // This is the main entry point for the connection/request pipeline.
166   // We're
167   // overriding async_connection_base<...>::start(...) here which is
168   // called
169   // by the client.
startboost::network::http::impl::http_async_connection170   virtual response start(request const& request, string_type const& method,
171                          bool get_body, body_callback_function_type callback,
172                          body_generator_function_type generator) {
173     response response_;
174     this->init_response(response_, get_body);
175     linearize(request, method, version_major, version_minor,
176               std::ostreambuf_iterator<typename char_<Tag>::type>(
177                   &command_streambuf));
178     this->method = method;
179     std::uint16_t port_ = port(request);
180     string_type host_ = host(request);
181     std::uint16_t source_port = request.source_port();
182 
183     auto sni_hostname = request.sni_hostname();
184 
185     auto self = this->shared_from_this();
186     resolve_(resolver_, host_, port_,
187              request_strand_.wrap(
188                                   [=] (boost::system::error_code const &ec,
189                                        resolver_iterator_pair endpoint_range) {
190                                     self->handle_resolved(host_, port_, source_port, sni_hostname, get_body,
191                                                           callback, generator, ec, endpoint_range);
192                                   }));
193     if (timeout_ > 0) {
194       timer_.expires_from_now(std::chrono::seconds(timeout_));
195       timer_.async_wait(request_strand_.wrap([=] (boost::system::error_code const &ec) {
196             self->handle_timeout(ec);
197           }));
198     }
199     return response_;
200   }
201 
202  private:
set_errorsboost::network::http::impl::http_async_connection203   void set_errors(boost::system::error_code const& ec, body_callback_function_type callback) {
204     boost::system::system_error error(ec);
205     this->version_promise.set_exception(error);
206     this->status_promise.set_exception(error);
207     this->status_message_promise.set_exception(error);
208     this->headers_promise.set_exception(error);
209     this->source_promise.set_exception(error);
210     this->destination_promise.set_exception(error);
211     this->body_promise.set_exception(error);
212     if ( callback )
213       callback( char_const_range(), ec );
214     this->timer_.cancel();
215   }
216 
handle_timeoutboost::network::http::impl::http_async_connection217   void handle_timeout(boost::system::error_code const& ec) {
218     if (!ec) delegate_->disconnect();
219     is_timedout_ = true;
220   }
221 
handle_resolvedboost::network::http::impl::http_async_connection222   void handle_resolved(string_type host, std::uint16_t port,
223                        std::uint16_t source_port, optional<string_type> sni_hostname, bool get_body,
224                        body_callback_function_type callback,
225                        body_generator_function_type generator,
226                        boost::system::error_code const& ec,
227                        resolver_iterator_pair endpoint_range) {
228     if (!ec && !boost::empty(endpoint_range)) {
229       // Here we deal with the case that there was an error encountered and
230       // that there's still more endpoints to try connecting to.
231       resolver_iterator iter = boost::begin(endpoint_range);
232       boost::asio::ip::tcp::endpoint endpoint(iter->endpoint().address(), port);
233       auto self = this->shared_from_this();
234       delegate_->connect(
235           endpoint, host, source_port, sni_hostname,
236           request_strand_.wrap([=] (boost::system::error_code const &ec) {
237               auto iter_copy = iter;
238               self->handle_connected(host, port, source_port, sni_hostname, get_body, callback,
239                                      generator, std::make_pair(++iter_copy, resolver_iterator()), ec);
240             }));
241     } else {
242       set_errors((ec ? ec : boost::asio::error::host_not_found), callback);
243     }
244   }
245 
handle_connectedboost::network::http::impl::http_async_connection246   void handle_connected(string_type host, std::uint16_t port,
247                         std::uint16_t source_port, optional<string_type> sni_hostname, bool get_body,
248                         body_callback_function_type callback,
249                         body_generator_function_type generator,
250                         resolver_iterator_pair endpoint_range,
251                         boost::system::error_code const& ec) {
252     if (is_timedout_) {
253       set_errors(boost::asio::error::timed_out, callback);
254     } else if (!ec) {
255       BOOST_ASSERT(delegate_.get() != 0);
256       auto self = this->shared_from_this();
257       delegate_->write(
258           command_streambuf,
259           request_strand_.wrap([=] (boost::system::error_code const &ec,
260                                     std::size_t bytes_transferred) {
261                                  self->handle_sent_request(get_body, callback, generator,
262                                                            ec, bytes_transferred);
263                                }));
264     } else {
265       if (!boost::empty(endpoint_range)) {
266         resolver_iterator iter = boost::begin(endpoint_range);
267         boost::asio::ip::tcp::endpoint endpoint(iter->endpoint().address(), port);
268         auto self = this->shared_from_this();
269         delegate_->connect(
270             endpoint, host, source_port, sni_hostname,
271             request_strand_.wrap([=] (boost::system::error_code const &ec) {
272                 auto iter_copy = iter;
273                 self->handle_connected(host, port, source_port, sni_hostname, get_body, callback,
274                                        generator, std::make_pair(++iter_copy, resolver_iterator()),
275                                        ec);
276               }));
277       } else {
278         set_errors((ec ? ec : boost::asio::error::host_not_found), callback);
279       }
280     }
281   }
282 
283   enum state_t { version, status, status_message, headers, body };
284 
handle_sent_requestboost::network::http::impl::http_async_connection285   void handle_sent_request(bool get_body, body_callback_function_type callback,
286                            body_generator_function_type generator,
287                            boost::system::error_code const& ec,
288                            std::size_t /*bytes_transferred*/) {  // TODO(unassigned): use-case?
289     if (!is_timedout_ && !ec) {
290       if (generator) {
291         // Here we write some more data that the generator provides, before we
292         // wait for data from the server.
293         string_type chunk;
294         if (generator(chunk)) {
295           // At this point this means we have more data to write, so we write
296           // it out.
297           std::copy(chunk.begin(), chunk.end(),
298                     std::ostreambuf_iterator<typename char_<Tag>::type>(
299                         &command_streambuf));
300           auto self = this->shared_from_this();
301           delegate_->write(
302               command_streambuf,
303               request_strand_.wrap([=] (boost::system::error_code const &ec,
304                                         std::size_t bytes_transferred) {
305                                      self->handle_sent_request(get_body, callback, generator,
306                                                                ec, bytes_transferred);
307                                    }));
308           return;
309         }
310       }
311 
312       auto self = this->shared_from_this();
313       delegate_->read_some(
314           boost::asio::mutable_buffers_1(this->part.data(),
315                                          this->part.size()),
316           request_strand_.wrap([=] (boost::system::error_code const &ec,
317                                     std::size_t bytes_transferred) {
318                                  self->handle_received_data(version, get_body, callback,
319                                                             ec, bytes_transferred);
320                                }));
321     } else {
322       set_errors((is_timedout_ ? boost::asio::error::timed_out : ec), callback);
323     }
324   }
325 
handle_received_databoost::network::http::impl::http_async_connection326   void handle_received_data(state_t state, bool get_body,
327                             body_callback_function_type callback,
328                             boost::system::error_code const& ec,
329                             std::size_t bytes_transferred) {
330     static const long short_read_error = 335544539;
331     bool is_ssl_short_read_error =
332 #ifdef BOOST_NETWORK_ENABLE_HTTPS
333         ec.category() == boost::asio::error::ssl_category &&
334         ec.value() == short_read_error;
335 #else
336         false && short_read_error;
337 #endif
338     if (!is_timedout_ &&
339         (!ec || ec == boost::asio::error::eof || is_ssl_short_read_error)) {
340       logic::tribool parsed_ok;
341       size_t remainder;
342       auto self = this->shared_from_this();
343       switch (state) {
344         case version:
345           if (ec == boost::asio::error::eof) return;
346           parsed_ok = this->parse_version(
347               delegate_,
348               request_strand_.wrap([=] (boost::system::error_code const &ec,
349                                         std::size_t bytes_transferred) {
350                                      self->handle_received_data(version, get_body, callback,
351                                                                 ec, bytes_transferred);
352                                    }),
353               bytes_transferred);
354           if (!parsed_ok || indeterminate(parsed_ok)) {
355             return;
356           }
357           // fall-through
358         case status:
359           if (ec == boost::asio::error::eof) return;
360           parsed_ok = this->parse_status(
361               delegate_,
362               request_strand_.wrap([=] (boost::system::error_code const &ec,
363                                         std::size_t bytes_transferred) {
364                                      self->handle_received_data(status, get_body, callback,
365                                                                 ec, bytes_transferred);
366                                    }),
367               bytes_transferred);
368           if (!parsed_ok || indeterminate(parsed_ok)) {
369             return;
370           }
371           // fall-through
372         case status_message:
373           if (ec == boost::asio::error::eof) return;
374           parsed_ok = this->parse_status_message(
375               delegate_, request_strand_.wrap([=] (boost::system::error_code const &,
376                                                    std::size_t bytes_transferred) {
377                                                 self->handle_received_data(status_message, get_body, callback,
378                                                                            ec, bytes_transferred);
379                                               }),
380               bytes_transferred);
381           if (!parsed_ok || indeterminate(parsed_ok)) {
382             return;
383           }
384           // fall-through
385         case headers:
386           if (ec == boost::asio::error::eof) return;
387           // In the following, remainder is the number of bytes that remain in
388           // the buffer. We need this in the body processing to make sure that
389           // the data remaining in the buffer is dealt with before another call
390           // to get more data for the body is scheduled.
391           std::tie(parsed_ok, remainder) = this->parse_headers(
392               delegate_,
393               request_strand_.wrap([=] (boost::system::error_code const &ec,
394                                         std::size_t bytes_transferred) {
395                                      self->handle_received_data(headers, get_body, callback,
396                                                                 ec, bytes_transferred);
397                                    }),
398               bytes_transferred);
399 
400           if (!parsed_ok || indeterminate(parsed_ok)) {
401             return;
402           }
403 
404           if (!get_body) {
405             // We short-circuit here because the user does not want to get the
406             // body (in the case of a HEAD request).
407             this->body_promise.set_value("");
408             if ( callback )
409               callback( char_const_range(), boost::asio::error::eof );
410             this->destination_promise.set_value("");
411             this->source_promise.set_value("");
412             // this->part.assign('\0');
413             boost::copy("\0", std::begin(this->part));
414             this->response_parser_.reset();
415             return;
416           }
417 
418           if (callback) {
419             // Here we deal with the spill-over data from the headers
420             // processing. This means the headers data has already been parsed
421             // appropriately and we're looking to treat everything that remains
422             // in the buffer.
423             typename protocol_base::buffer_type::const_iterator begin =
424                 this->part_begin;
425             typename protocol_base::buffer_type::const_iterator end = begin;
426             std::advance(end, remainder);
427 
428             // We're setting the body promise here to an empty string because
429             // this can be used as a signaling mechanism for the user to
430             // determine that the body is now ready for processing, even though
431             // the callback is already provided.
432             this->body_promise.set_value("");
433 
434             // The invocation of the callback is synchronous to allow us to
435             // wait before scheduling another read.
436             if (this->is_chunk_encoding && remove_chunk_markers_) {
437               callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec);
438             } else {
439               callback(make_iterator_range(begin, end), ec);
440             }
441             auto self = this->shared_from_this();
442             delegate_->read_some(
443                 boost::asio::mutable_buffers_1(this->part.data(),
444                                                this->part.size()),
445                 request_strand_.wrap([=] (boost::system::error_code const &ec,
446                                           std::size_t bytes_transferred) {
447                                        self->handle_received_data(body, get_body, callback,
448                                                                   ec, bytes_transferred);
449                                      }));
450           } else {
451             // Here we handle the body data ourself and append to an
452             // ever-growing string buffer.
453             auto self = this->shared_from_this();
454             this->parse_body(
455                 delegate_,
456                 request_strand_.wrap([=] (boost::system::error_code const &ec,
457                                           std::size_t bytes_transferred) {
458                                        self->handle_received_data(body, get_body, callback,
459                                                                   ec, bytes_transferred);
460                                      }),
461                 remainder);
462           }
463           return;
464         case body:
465           if (ec == boost::asio::error::eof || is_ssl_short_read_error) {
466             // Here we're handling the case when the connection has been closed
467             // from the server side, or at least that the end of file has been
468             // reached while reading the socket. This signals the end of the
469             // body processing chain.
470             if (callback) {
471               typename protocol_base::buffer_type::const_iterator
472                   begin = this->part.begin(),
473                   end = begin;
474               std::advance(end, bytes_transferred);
475 
476               // We call the callback function synchronously passing the error
477               // condition (in this case, end of file) so that it can handle it
478               // appropriately.
479               if (this->is_chunk_encoding && remove_chunk_markers_) {
480                 callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec);
481               } else {
482                 callback(make_iterator_range(begin, end), ec);
483               }
484             } else {
485               string_type body_string;
486               if (this->is_chunk_encoding && remove_chunk_markers_) {
487                 const auto parse_buffer_size = parse_chunk_encoding.buffer.size();
488                 for (size_t i = 0; i < this->partial_parsed.size(); i += parse_buffer_size) {
489                   auto range = parse_chunk_encoding(boost::make_iterator_range(
490                       this->partial_parsed.cbegin() + i,
491                       this->partial_parsed.cbegin() +
492                           std::min(i + parse_buffer_size, this->partial_parsed.size())));
493                   body_string.append(boost::begin(range), boost::end(range));
494                 }
495                 this->partial_parsed.clear();
496                 auto range = parse_chunk_encoding(boost::make_iterator_range(
497                     this->part.begin(),
498                     this->part.begin() + bytes_transferred));
499                 body_string.append(boost::begin(range), boost::end(range));
500                 this->body_promise.set_value(body_string);
501               } else {
502                 std::swap(body_string, this->partial_parsed);
503                 body_string.append(this->part.begin(),
504                                    this->part.begin() + bytes_transferred);
505                 this->body_promise.set_value(body_string);
506               }
507             }
508             // TODO(dberris): set the destination value somewhere!
509             this->destination_promise.set_value("");
510             this->source_promise.set_value("");
511             // this->part.assign('\0');
512             boost::copy("\0", std::begin(this->part));
513             this->response_parser_.reset();
514             this->timer_.cancel();
515           } else {
516             // This means the connection has not been closed yet and we want to
517             // get more data.
518             if (callback) {
519               // Here we have a body_handler callback. Let's invoke the
520               // callback from here and make sure we're getting more data right
521               // after.
522               typename protocol_base::buffer_type::const_iterator begin =
523                   this->part.begin();
524               typename protocol_base::buffer_type::const_iterator end = begin;
525               std::advance(end, bytes_transferred);
526               if (this->is_chunk_encoding && remove_chunk_markers_) {
527                 callback(parse_chunk_encoding(make_iterator_range(begin, end)), ec);
528               } else {
529                 callback(make_iterator_range(begin, end), ec);
530               }
531               auto self = this->shared_from_this();
532               delegate_->read_some(
533                   boost::asio::mutable_buffers_1(this->part.data(),
534                                                  this->part.size()),
535                   request_strand_.wrap([=] (boost::system::error_code const &ec,
536                                             std::size_t bytes_transferred) {
537                                          self->handle_received_data(body, get_body, callback,
538                                                                     ec, bytes_transferred);
539                                        }));
540             } else {
541               // Here we don't have a body callback. Let's make sure that we
542               // deal with the remainder from the headers part in case we do
543               // have data that's still in the buffer.
544               this->parse_body(
545                   delegate_,
546                   request_strand_.wrap([=] (boost::system::error_code const &ec,
547                                             std::size_t bytes_transferred) {
548                                          self->handle_received_data(body, get_body, callback,
549                                                                     ec, bytes_transferred);
550                                        }),
551                   bytes_transferred);
552             }
553           }
554           return;
555         default:
556           BOOST_ASSERT(false && "Bug, report this to the developers!");
557       }
558     } else {
559       boost::system::error_code report_code = is_timedout_ ? boost::asio::error::timed_out : ec;
560       boost::system::system_error error(report_code);
561       this->source_promise.set_exception(error);
562       this->destination_promise.set_exception(error);
563       switch (state) {
564         case version:
565           this->version_promise.set_exception(error);
566           // fall-through
567         case status:
568           this->status_promise.set_exception(error);
569           // fall-through
570         case status_message:
571           this->status_message_promise.set_exception(error);
572           // fall-through
573         case headers:
574           this->headers_promise.set_exception(error);
575           // fall-through
576         case body:
577           if (!callback) {
578             // N.B. if callback is non-null, then body_promise has already been
579             // set to value "" to indicate body is handled by streaming handler
580             // so no exception should be set
581             this->body_promise.set_exception(error);
582           }
583           else
584             callback( char_const_range(), report_code );
585           break;
586           // fall-through
587         default:
588           BOOST_ASSERT(false && "Bug, report this to the developers!");
589       }
590     }
591   }
592 
593   int timeout_;
594   bool remove_chunk_markers_;
595   boost::asio::steady_timer timer_;
596   bool is_timedout_;
597   bool follow_redirect_;
598   resolver_type& resolver_;
599   resolve_function resolve_;
600   boost::asio::io_service::strand request_strand_;
601   connection_delegate_ptr delegate_;
602   boost::asio::streambuf command_streambuf;
603   string_type method;
604   chunk_encoding_parser_type parse_chunk_encoding;
605 };
606 
607 }  // namespace impl
608 }  // namespace http
609 }  // namespace network
610 }  // namespace boost
611 
612 #endif  // BOOST_NETWORK_PROTOCOL_HTTP_IMPL_HTTP_ASYNC_CONNECTION_HPP_20100601
613