1 #ifndef SIMPLE_WEB_CLIENT_HTTP_HPP
2 #define SIMPLE_WEB_CLIENT_HTTP_HPP
3 
4 #include "asio_compatibility.hpp"
5 #include "mutex.hpp"
6 #include "utility.hpp"
7 #include <limits>
8 #include <random>
9 #include <unordered_set>
10 #include <vector>
11 
12 namespace SimpleWeb {
13   class HeaderEndMatch {
14     int crlfcrlf = 0;
15     int lflf = 0;
16 
17   public:
18     /// Match condition for asio::read_until to match both standard and non-standard HTTP header endings.
operator ()(asio::buffers_iterator<asio::const_buffers_1> begin,asio::buffers_iterator<asio::const_buffers_1> end)19     std::pair<asio::buffers_iterator<asio::const_buffers_1>, bool> operator()(asio::buffers_iterator<asio::const_buffers_1> begin, asio::buffers_iterator<asio::const_buffers_1> end) {
20       auto it = begin;
21       for(; it != end; ++it) {
22         if(*it == '\n') {
23           if(crlfcrlf == 1)
24             ++crlfcrlf;
25           else if(crlfcrlf == 2)
26             crlfcrlf = 0;
27           else if(crlfcrlf == 3)
28             return {++it, true};
29           if(lflf == 0)
30             ++lflf;
31           else if(lflf == 1)
32             return {++it, true};
33         }
34         else if(*it == '\r') {
35           if(crlfcrlf == 0)
36             ++crlfcrlf;
37           else if(crlfcrlf == 2)
38             ++crlfcrlf;
39           else
40             crlfcrlf = 0;
41           lflf = 0;
42         }
43         else {
44           crlfcrlf = 0;
45           lflf = 0;
46         }
47       }
48       return {it, false};
49     }
50   };
51 } // namespace SimpleWeb
52 #ifndef USE_STANDALONE_ASIO
53 namespace boost {
54 #endif
55   namespace asio {
56     template <> struct is_match_condition<SimpleWeb::HeaderEndMatch> : public std::true_type {};
57   } // namespace asio
58 #ifndef USE_STANDALONE_ASIO
59 } // namespace boost
60 #endif
61 
62 namespace SimpleWeb {
63   template <class socket_type>
64   class Client;
65 
66   template <class socket_type>
67   class ClientBase {
68   public:
69     class Content : public std::istream {
70       friend class ClientBase<socket_type>;
71 
72     public:
size()73       std::size_t size() noexcept {
74         return streambuf.size();
75       }
76       /// Convenience function to return content as a string. The stream buffer is consumed.
string()77       std::string string() noexcept {
78         try {
79           std::string str;
80           auto size = streambuf.size();
81           str.resize(size);
82           read(&str[0], static_cast<std::streamsize>(size));
83           return str;
84         }
85         catch(...) {
86           return std::string();
87         }
88       }
89 
90     private:
91       asio::streambuf &streambuf;
Content(asio::streambuf & streambuf)92       Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {}
93     };
94 
95     class Response {
96       friend class ClientBase<socket_type>;
97       friend class Client<socket_type>;
98 
99       asio::streambuf streambuf;
100 
Response(std::size_t max_response_streambuf_size)101       Response(std::size_t max_response_streambuf_size) noexcept : streambuf(max_response_streambuf_size), content(streambuf) {}
102 
103     public:
104       std::string http_version, status_code;
105 
106       Content content;
107 
108       CaseInsensitiveMultimap header;
109     };
110 
111     class Config {
112       friend class ClientBase<socket_type>;
113 
114     private:
Config()115       Config() noexcept {}
116 
117     public:
118       /// Set timeout on requests in seconds. Default value: 0 (no timeout).
119       long timeout = 0;
120       /// Set connect timeout in seconds. Default value: 0 (Config::timeout is then used instead).
121       long timeout_connect = 0;
122       /// Maximum size of response stream buffer. Defaults to architecture maximum.
123       /// Reaching this limit will result in a message_size error code.
124       std::size_t max_response_streambuf_size = std::numeric_limits<std::size_t>::max();
125       /// Set proxy server (server:port)
126       std::string proxy_server;
127     };
128 
129   protected:
130     class Connection : public std::enable_shared_from_this<Connection> {
131     public:
132       template <typename... Args>
Connection(std::shared_ptr<ScopeRunner> handler_runner_,long timeout,Args &&...args)133       Connection(std::shared_ptr<ScopeRunner> handler_runner_, long timeout, Args &&... args) noexcept
134           : handler_runner(std::move(handler_runner_)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {}
135 
136       std::shared_ptr<ScopeRunner> handler_runner;
137       long timeout;
138 
139       std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
140       bool in_use = false;
141       bool event_stream = false;
142       bool attempt_reconnect = true;
143 
144       std::unique_ptr<asio::steady_timer> timer;
145 
close()146       void close() noexcept {
147         error_code ec;
148         socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
149         socket->lowest_layer().cancel(ec);
150       }
151 
set_timeout(long seconds=0)152       void set_timeout(long seconds = 0) noexcept {
153         if(seconds == 0)
154           seconds = timeout;
155         if(seconds == 0) {
156           timer = nullptr;
157           return;
158         }
159         timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(get_socket_executor(*socket), std::chrono::seconds(seconds)));
160         std::weak_ptr<Connection> self_weak(this->shared_from_this()); // To avoid keeping Connection instance alive longer than needed
161         timer->async_wait([self_weak](const error_code &ec) {
162           if(!ec) {
163             if(auto self = self_weak.lock())
164               self->close();
165           }
166         });
167       }
168 
cancel_timeout()169       void cancel_timeout() noexcept {
170         if(timer) {
171           try {
172             timer->cancel();
173           }
174           catch(...) {
175           }
176         }
177       }
178     };
179 
180     class Session {
181     public:
Session(std::size_t max_response_streambuf_size,std::shared_ptr<Connection> connection_,std::unique_ptr<asio::streambuf> request_streambuf_)182       Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection_, std::unique_ptr<asio::streambuf> request_streambuf_) noexcept
183           : connection(std::move(connection_)), request_streambuf(std::move(request_streambuf_)), response(new Response(max_response_streambuf_size)) {}
184 
185       std::shared_ptr<Connection> connection;
186       std::unique_ptr<asio::streambuf> request_streambuf;
187       std::shared_ptr<Response> response;
188       std::function<void(const error_code &)> callback;
189     };
190 
191   public:
192     /// Set before calling a request function.
193     Config config;
194 
195     /// If you want to reuse an already created asio::io_service, store its pointer here before calling a request function.
196     std::shared_ptr<io_context> io_service;
197 
198     /// Convenience function to perform synchronous request. The io_service is run within this function.
199     /// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
200     /// Do not use concurrently with the asynchronous request functions.
201     /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
request(const std::string & method,const std::string & path={"/"},string_view content={},const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())202     std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
203       std::shared_ptr<Response> response;
204       error_code ec;
__anon1e6285bd0202(std::shared_ptr<Response> response_, const error_code &ec_) 205       request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) {
206         response = response_;
207         ec = ec_;
208       });
209 
210       {
211         LockGuard lock(concurrent_synchronous_requests_mutex);
212         ++concurrent_synchronous_requests;
213       }
214       io_service->run();
215       {
216         LockGuard lock(concurrent_synchronous_requests_mutex);
217         --concurrent_synchronous_requests;
218         if(!concurrent_synchronous_requests)
219           restart(*io_service);
220       }
221 
222       if(ec)
223         throw system_error(ec);
224 
225       return response;
226     }
227 
228     /// Convenience function to perform synchronous request. The io_service is run within this function.
229     /// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
230     /// Do not use concurrently with the asynchronous request functions.
231     /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())232     std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
233       std::shared_ptr<Response> response;
234       error_code ec;
235       request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) {
236         response = response_;
237         ec = ec_;
238       });
239 
240       {
241         LockGuard lock(concurrent_synchronous_requests_mutex);
242         ++concurrent_synchronous_requests;
243       }
244       io_service->run();
245       {
246         LockGuard lock(concurrent_synchronous_requests_mutex);
247         --concurrent_synchronous_requests;
248         if(!concurrent_synchronous_requests)
249           restart(*io_service);
250       }
251 
252       if(ec)
253         throw system_error(ec);
254 
255       return response;
256     }
257 
258     /// Asynchronous request where running Client's io_service is required.
259     /// Do not use concurrently with the synchronous request functions.
260     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,const std::string & path,string_view content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)261     void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
262                  std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
263       auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
264       std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed
265       auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
266       session->callback = [this, session_weak, request_callback](const error_code &ec) {
267         if(auto session = session_weak.lock()) {
268           {
269             LockGuard lock(this->connections_mutex);
270             if(!session->connection->event_stream)
271               session->connection->in_use = false;
272 
273             // Remove unused connections, but keep one open for HTTP persistent connection:
274             std::size_t unused_connections = 0;
275             for(auto it = this->connections.begin(); it != this->connections.end();) {
276               if(ec && session->connection == *it)
277                 it = this->connections.erase(it);
278               else if((*it)->in_use)
279                 ++it;
280               else {
281                 ++unused_connections;
282                 if(unused_connections > 1)
283                   it = this->connections.erase(it);
284                 else
285                   ++it;
286               }
287             }
288           }
289 
290           if(*request_callback)
291             (*request_callback)(session->response, ec);
292         }
293       };
294 
295       std::ostream write_stream(session->request_streambuf.get());
296       if(content.size() > 0) {
297         auto header_it = header.find("Content-Length");
298         if(header_it == header.end()) {
299           header_it = header.find("Transfer-Encoding");
300           if(header_it == header.end() || header_it->second != "chunked")
301             write_stream << "Content-Length: " << content.size() << "\r\n";
302         }
303       }
304       write_stream << "\r\n";
305       write_stream.write(content.data(), static_cast<std::streamsize>(content.size()));
306 
307       connect(session);
308     }
309 
310     /// Asynchronous request where running Client's io_service is required.
311     /// Do not use concurrently with the synchronous request functions.
312     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,const std::string & path,string_view content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)313     void request(const std::string &method, const std::string &path, string_view content,
314                  std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
315       request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_));
316     }
317 
318     /// Asynchronous request where running Client's io_service is required.
319     /// Do not use concurrently with the synchronous request functions.
320     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,const std::string & path,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)321     void request(const std::string &method, const std::string &path,
322                  std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
323       request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback_));
324     }
325 
326     /// Asynchronous request where running Client's io_service is required.
327     /// Do not use concurrently with the synchronous request functions.
328     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)329     void request(const std::string &method, std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
330       request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback_));
331     }
332 
333     /// Asynchronous request where running Client's io_service is required.
334     /// Do not use concurrently with the synchronous request functions.
335     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)336     void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
337                  std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
338       auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
339       std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed
340       auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
341       session->callback = [this, session_weak, request_callback](const error_code &ec) {
342         if(auto session = session_weak.lock()) {
343           {
344             LockGuard lock(this->connections_mutex);
345             if(!session->connection->event_stream)
346               session->connection->in_use = false;
347 
348             // Remove unused connections, but keep one open for HTTP persistent connection:
349             std::size_t unused_connections = 0;
350             for(auto it = this->connections.begin(); it != this->connections.end();) {
351               if(ec && session->connection == *it)
352                 it = this->connections.erase(it);
353               else if((*it)->in_use)
354                 ++it;
355               else {
356                 ++unused_connections;
357                 if(unused_connections > 1)
358                   it = this->connections.erase(it);
359                 else
360                   ++it;
361               }
362             }
363           }
364 
365           if(*request_callback)
366             (*request_callback)(session->response, ec);
367         }
368       };
369 
370       content.seekg(0, std::ios::end);
371       auto content_length = content.tellg();
372       content.seekg(0, std::ios::beg);
373       std::ostream write_stream(session->request_streambuf.get());
374       if(content_length > 0) {
375         auto header_it = header.find("Content-Length");
376         if(header_it == header.end()) {
377           header_it = header.find("Transfer-Encoding");
378           if(header_it == header.end() || header_it->second != "chunked")
379             write_stream << "Content-Length: " << content_length << "\r\n";
380         }
381       }
382       write_stream << "\r\n";
383       if(content_length > 0)
384         write_stream << content.rdbuf();
385 
386       connect(session);
387     }
388 
389     /// Asynchronous request where running Client's io_service is required.
390     /// Do not use concurrently with the synchronous request functions.
391     /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call
request(const std::string & method,const std::string & path,std::istream & content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)392     void request(const std::string &method, const std::string &path, std::istream &content,
393                  std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
394       request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_));
395     }
396 
397     /// Close connections.
stop()398     void stop() noexcept {
399       LockGuard lock(connections_mutex);
400       for(auto it = connections.begin(); it != connections.end();) {
401         (*it)->close();
402         it = connections.erase(it);
403       }
404     }
405 
~ClientBase()406     virtual ~ClientBase() noexcept {
407       handler_runner->stop();
408       stop();
409     }
410 
411   protected:
412     bool internal_io_service = false;
413 
414     std::string host;
415     unsigned short port;
416     unsigned short default_port;
417 
418     std::unique_ptr<std::pair<std::string, std::string>> host_port;
419 
420     Mutex connections_mutex;
421     std::unordered_set<std::shared_ptr<Connection>> connections GUARDED_BY(connections_mutex);
422 
423     std::shared_ptr<ScopeRunner> handler_runner;
424 
425     Mutex concurrent_synchronous_requests_mutex;
426     std::size_t concurrent_synchronous_requests GUARDED_BY(concurrent_synchronous_requests_mutex) = 0;
427 
ClientBase(const std::string & host_port,unsigned short default_port)428     ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) {
429       auto parsed_host_port = parse_host_port(host_port, default_port);
430       host = parsed_host_port.first;
431       port = parsed_host_port.second;
432     }
433 
get_connection()434     std::shared_ptr<Connection> get_connection() noexcept {
435       std::shared_ptr<Connection> connection;
436       LockGuard lock(connections_mutex);
437 
438       if(!io_service) {
439         io_service = std::make_shared<io_context>();
440         internal_io_service = true;
441       }
442 
443       for(auto it = connections.begin(); it != connections.end(); ++it) {
444         if(!(*it)->in_use) {
445           connection = *it;
446           break;
447         }
448       }
449       if(!connection) {
450         connection = create_connection();
451         connections.emplace(connection);
452       }
453       connection->attempt_reconnect = true;
454       connection->in_use = true;
455 
456       if(!host_port) {
457         if(config.proxy_server.empty())
458           host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(host, std::to_string(port)));
459         else {
460           auto proxy_host_port = parse_host_port(config.proxy_server, 8080);
461           host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(proxy_host_port.first, std::to_string(proxy_host_port.second)));
462         }
463       }
464 
465       return connection;
466     }
467 
parse_host_port(const std::string & host_port,unsigned short default_port) const468     std::pair<std::string, unsigned short> parse_host_port(const std::string &host_port, unsigned short default_port) const noexcept {
469       std::pair<std::string, unsigned short> parsed_host_port;
470       std::size_t host_end = host_port.find(':');
471       if(host_end == std::string::npos) {
472         parsed_host_port.first = host_port;
473         parsed_host_port.second = default_port;
474       }
475       else {
476         parsed_host_port.first = host_port.substr(0, host_end);
477         parsed_host_port.second = static_cast<unsigned short>(stoul(host_port.substr(host_end + 1)));
478       }
479       return parsed_host_port;
480     }
481 
482     virtual std::shared_ptr<Connection> create_connection() noexcept = 0;
483     virtual void connect(const std::shared_ptr<Session> &) = 0;
484 
create_request_header(const std::string & method,const std::string & path,const CaseInsensitiveMultimap & header) const485     std::unique_ptr<asio::streambuf> create_request_header(const std::string &method, const std::string &path, const CaseInsensitiveMultimap &header) const {
486       auto corrected_path = path;
487       if(corrected_path == "")
488         corrected_path = "/";
489       if(!config.proxy_server.empty() && std::is_same<socket_type, asio::ip::tcp::socket>::value)
490         corrected_path = "http://" + host + ':' + std::to_string(port) + corrected_path;
491 
492       std::unique_ptr<asio::streambuf> streambuf(new asio::streambuf());
493       std::ostream write_stream(streambuf.get());
494       write_stream << method << " " << corrected_path << " HTTP/1.1\r\n";
495       write_stream << "Host: " << host;
496       if(port != default_port)
497         write_stream << ':' << std::to_string(port);
498       write_stream << "\r\n";
499       for(auto &h : header)
500         write_stream << h.first << ": " << h.second << "\r\n";
501       return streambuf;
502     }
503 
write(const std::shared_ptr<Session> & session)504     void write(const std::shared_ptr<Session> &session) {
505       session->connection->set_timeout();
506       asio::async_write(*session->connection->socket, session->request_streambuf->data(), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) {
507         session->connection->cancel_timeout();
508         auto lock = session->connection->handler_runner->continue_lock();
509         if(!lock)
510           return;
511         if(!ec)
512           this->read(session);
513         else
514           session->callback(ec);
515       });
516     }
517 
read(const std::shared_ptr<Session> & session)518     void read(const std::shared_ptr<Session> &session) {
519       session->connection->set_timeout();
520       asio::async_read_until(*session->connection->socket, session->response->streambuf, HeaderEndMatch(), [this, session](const error_code &ec, std::size_t bytes_transferred) {
521         session->connection->cancel_timeout();
522         auto lock = session->connection->handler_runner->continue_lock();
523         if(!lock)
524           return;
525         if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
526           session->callback(make_error_code::make_error_code(errc::message_size));
527           return;
528         }
529 
530         if(!ec) {
531           session->connection->attempt_reconnect = true;
532           std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred;
533 
534           if(!ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) {
535             session->callback(make_error_code::make_error_code(errc::protocol_error));
536             return;
537           }
538 
539           auto header_it = session->response->header.find("Content-Length");
540           if(header_it != session->response->header.end()) {
541             auto content_length = stoull(header_it->second);
542             if(content_length > num_additional_bytes) {
543               session->connection->set_timeout();
544               asio::async_read(*session->connection->socket, session->response->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [session](const error_code &ec, std::size_t /*bytes_transferred*/) {
545                 session->connection->cancel_timeout();
546                 auto lock = session->connection->handler_runner->continue_lock();
547                 if(!lock)
548                   return;
549                 if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
550                   session->callback(make_error_code::make_error_code(errc::message_size));
551                   return;
552                 }
553 
554                 if(!ec)
555                   session->callback(ec);
556                 else
557                   session->callback(ec);
558               });
559             }
560             else
561               session->callback(ec);
562           }
563           else if((header_it = session->response->header.find("Transfer-Encoding")) != session->response->header.end() && header_it->second == "chunked") {
564             auto chunks_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size);
565 
566             // Copy leftover bytes
567             std::ostream ostream(chunks_streambuf.get());
568             auto size = session->response->streambuf.size();
569             std::unique_ptr<char[]> buffer(new char[size]);
570             session->response->content.read(buffer.get(), static_cast<std::streamsize>(size));
571             ostream.write(buffer.get(), static_cast<std::streamsize>(size));
572 
573             this->read_chunked_transfer_encoded(session, chunks_streambuf);
574           }
575           else if(session->response->http_version < "1.1" || ((header_it = session->response->header.find("Session")) != session->response->header.end() && header_it->second == "close")) {
576             session->connection->set_timeout();
577             asio::async_read(*session->connection->socket, session->response->streambuf, [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) {
578               session->connection->cancel_timeout();
579               auto lock = session->connection->handler_runner->continue_lock();
580               if(!lock)
581                 return;
582               if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
583                 session->callback(make_error_code::make_error_code(errc::message_size));
584                 return;
585               }
586 
587               if(!ec) {
588                 {
589                   LockGuard lock(this->connections_mutex);
590                   this->connections.erase(session->connection);
591                 }
592                 session->callback(ec);
593               }
594               else
595                 session->callback(ec == error::eof ? error_code() : ec);
596             });
597           }
598           else if(((header_it = session->response->header.find("Content-Type")) != session->response->header.end() && header_it->second == "text/event-stream")) {
599             session->connection->event_stream = true;
600 
601             auto events_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size);
602 
603             // Copy leftover bytes
604             std::ostream ostream(events_streambuf.get());
605             auto size = session->response->streambuf.size();
606             std::unique_ptr<char[]> buffer(new char[size]);
607             session->response->content.read(buffer.get(), static_cast<std::streamsize>(size));
608             ostream.write(buffer.get(), static_cast<std::streamsize>(size));
609 
610             session->callback(ec); // Connection to a Server-Sent Events resource is opened
611 
612             this->read_server_sent_event(session, events_streambuf);
613           }
614           else
615             session->callback(ec);
616         }
617         else {
618           if(session->connection->attempt_reconnect && ec != error::operation_aborted) {
619             LockGuard lock(connections_mutex);
620             auto it = connections.find(session->connection);
621             if(it != connections.end()) {
622               connections.erase(it);
623               session->connection = create_connection();
624               session->connection->attempt_reconnect = false;
625               session->connection->in_use = true;
626               session->response = std::shared_ptr<Response>(new Response(this->config.max_response_streambuf_size));
627               connections.emplace(session->connection);
628               lock.unlock();
629               this->connect(session);
630             }
631             else {
632               lock.unlock();
633               session->callback(ec);
634             }
635           }
636           else
637             session->callback(ec);
638         }
639       });
640     }
641 
read_chunked_transfer_encoded(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunks_streambuf)642     void read_chunked_transfer_encoded(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunks_streambuf) {
643       session->connection->set_timeout();
644       asio::async_read_until(*session->connection->socket, *chunks_streambuf, "\r\n", [this, session, chunks_streambuf](const error_code &ec, size_t bytes_transferred) {
645         session->connection->cancel_timeout();
646         auto lock = session->connection->handler_runner->continue_lock();
647         if(!lock)
648           return;
649         if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
650           session->callback(make_error_code::make_error_code(errc::message_size));
651           return;
652         }
653 
654         if(!ec) {
655           std::istream istream(chunks_streambuf.get());
656           std::string line;
657           getline(istream, line);
658           bytes_transferred -= line.size() + 1;
659           line.pop_back();
660           unsigned long length = 0;
661           try {
662             length = stoul(line, 0, 16);
663           }
664           catch(...) {
665             session->callback(make_error_code::make_error_code(errc::protocol_error));
666             return;
667           }
668 
669           auto num_additional_bytes = chunks_streambuf->size() - bytes_transferred;
670 
671           if((2 + length) > num_additional_bytes) {
672             session->connection->set_timeout();
673             asio::async_read(*session->connection->socket, *chunks_streambuf, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, chunks_streambuf, length](const error_code &ec, size_t /*bytes_transferred*/) {
674               session->connection->cancel_timeout();
675               auto lock = session->connection->handler_runner->continue_lock();
676               if(!lock)
677                 return;
678               if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
679                 session->callback(make_error_code::make_error_code(errc::message_size));
680                 return;
681               }
682 
683               if(!ec)
684                 this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
685               else
686                 session->callback(ec);
687             });
688           }
689           else
690             this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
691         }
692         else
693           session->callback(ec);
694       });
695     }
696 
read_chunked_transfer_encoded_chunk(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunks_streambuf,unsigned long length)697     void read_chunked_transfer_encoded_chunk(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunks_streambuf, unsigned long length) {
698       std::istream istream(chunks_streambuf.get());
699       if(length > 0) {
700         std::ostream ostream(&session->response->streambuf);
701         std::unique_ptr<char[]> buffer(new char[length]);
702         istream.read(buffer.get(), static_cast<std::streamsize>(length));
703         ostream.write(buffer.get(), static_cast<std::streamsize>(length));
704         if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
705           session->callback(make_error_code::make_error_code(errc::message_size));
706           return;
707         }
708       }
709 
710       // Remove "\r\n"
711       istream.get();
712       istream.get();
713 
714       if(length > 0)
715         read_chunked_transfer_encoded(session, chunks_streambuf);
716       else
717         session->callback(error_code());
718     }
719 
read_server_sent_event(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & events_streambuf)720     void read_server_sent_event(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &events_streambuf) {
721       session->connection->set_timeout();
722       asio::async_read_until(*session->connection->socket, *events_streambuf, HeaderEndMatch(), [this, session, events_streambuf](const error_code &ec, std::size_t /*bytes_transferred*/) {
723         session->connection->cancel_timeout();
724         auto lock = session->connection->handler_runner->continue_lock();
725         if(!lock)
726           return;
727         if(events_streambuf->size() == events_streambuf->max_size()) {
728           session->callback(make_error_code::make_error_code(errc::message_size));
729           return;
730         }
731 
732         if(!ec) {
733           std::istream istream(events_streambuf.get());
734           std::ostream ostream(&session->response->streambuf);
735           std::string line;
736           while(std::getline(istream, line) && !line.empty() && !(line.back() == '\r' && line.size() == 1)) {
737             ostream.write(line.data(), static_cast<std::streamsize>(line.size() - (line.back() == '\r' ? 1 : 0)));
738             ostream.put('\n');
739           }
740 
741           session->callback(ec);
742           read_server_sent_event(session, events_streambuf);
743         }
744         else
745           session->callback(ec);
746       });
747     }
748   };
749 
750   template <class socket_type>
751   class Client : public ClientBase<socket_type> {};
752 
753   using HTTP = asio::ip::tcp::socket;
754 
755   template <>
756   class Client<HTTP> : public ClientBase<HTTP> {
757   public:
758     /**
759      * Constructs a client object.
760      *
761      * @param server_port_path Server resource given by host[:port][/path]
762      */
Client(const std::string & server_port_path)763     Client(const std::string &server_port_path) noexcept : ClientBase<HTTP>::ClientBase(server_port_path, 80) {}
764 
765   protected:
create_connection()766     std::shared_ptr<Connection> create_connection() noexcept override {
767       return std::make_shared<Connection>(handler_runner, config.timeout, *io_service);
768     }
769 
connect(const std::shared_ptr<Session> & session)770     void connect(const std::shared_ptr<Session> &session) override {
771       if(!session->connection->socket->lowest_layer().is_open()) {
772         auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
773         session->connection->set_timeout(config.timeout_connect);
774         async_resolve(*resolver, *host_port, [this, session, resolver](const error_code &ec, resolver_results results) {
775           session->connection->cancel_timeout();
776           auto lock = session->connection->handler_runner->continue_lock();
777           if(!lock)
778             return;
779           if(!ec) {
780             session->connection->set_timeout(config.timeout_connect);
781             asio::async_connect(*session->connection->socket, results, [this, session, resolver](const error_code &ec, async_connect_endpoint /*endpoint*/) {
782               session->connection->cancel_timeout();
783               auto lock = session->connection->handler_runner->continue_lock();
784               if(!lock)
785                 return;
786               if(!ec) {
787                 asio::ip::tcp::no_delay option(true);
788                 error_code ec;
789                 session->connection->socket->set_option(option, ec);
790                 this->write(session);
791               }
792               else
793                 session->callback(ec);
794             });
795           }
796           else
797             session->callback(ec);
798         });
799       }
800       else
801         write(session);
802     }
803   };
804 } // namespace SimpleWeb
805 
806 #endif /* SIMPLE_WEB_CLIENT_HTTP_HPP */
807