1 #ifndef SIMPLE_WEB_CLIENT_HTTP_HPP 2 #define SIMPLE_WEB_CLIENT_HTTP_HPP 3 4 #include "asio_compatibility.hpp" 5 #include "mutex.hpp" 6 #include "utility.hpp" 7 #include <limits> 8 #include <random> 9 #include <unordered_set> 10 #include <vector> 11 12 namespace SimpleWeb { 13 class HeaderEndMatch { 14 int crlfcrlf = 0; 15 int lflf = 0; 16 17 public: 18 /// Match condition for asio::read_until to match both standard and non-standard HTTP header endings. operator ()(asio::buffers_iterator<asio::const_buffers_1> begin,asio::buffers_iterator<asio::const_buffers_1> end)19 std::pair<asio::buffers_iterator<asio::const_buffers_1>, bool> operator()(asio::buffers_iterator<asio::const_buffers_1> begin, asio::buffers_iterator<asio::const_buffers_1> end) { 20 auto it = begin; 21 for(; it != end; ++it) { 22 if(*it == '\n') { 23 if(crlfcrlf == 1) 24 ++crlfcrlf; 25 else if(crlfcrlf == 2) 26 crlfcrlf = 0; 27 else if(crlfcrlf == 3) 28 return {++it, true}; 29 if(lflf == 0) 30 ++lflf; 31 else if(lflf == 1) 32 return {++it, true}; 33 } 34 else if(*it == '\r') { 35 if(crlfcrlf == 0) 36 ++crlfcrlf; 37 else if(crlfcrlf == 2) 38 ++crlfcrlf; 39 else 40 crlfcrlf = 0; 41 lflf = 0; 42 } 43 else { 44 crlfcrlf = 0; 45 lflf = 0; 46 } 47 } 48 return {it, false}; 49 } 50 }; 51 } // namespace SimpleWeb 52 #ifndef USE_STANDALONE_ASIO 53 namespace boost { 54 #endif 55 namespace asio { 56 template <> struct is_match_condition<SimpleWeb::HeaderEndMatch> : public std::true_type {}; 57 } // namespace asio 58 #ifndef USE_STANDALONE_ASIO 59 } // namespace boost 60 #endif 61 62 namespace SimpleWeb { 63 template <class socket_type> 64 class Client; 65 66 template <class socket_type> 67 class ClientBase { 68 public: 69 class Content : public std::istream { 70 friend class ClientBase<socket_type>; 71 72 public: size()73 std::size_t size() noexcept { 74 return streambuf.size(); 75 } 76 /// Convenience function to return content as a string. The stream buffer is consumed. string()77 std::string string() noexcept { 78 try { 79 std::string str; 80 auto size = streambuf.size(); 81 str.resize(size); 82 read(&str[0], static_cast<std::streamsize>(size)); 83 return str; 84 } 85 catch(...) { 86 return std::string(); 87 } 88 } 89 90 private: 91 asio::streambuf &streambuf; Content(asio::streambuf & streambuf)92 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {} 93 }; 94 95 class Response { 96 friend class ClientBase<socket_type>; 97 friend class Client<socket_type>; 98 99 asio::streambuf streambuf; 100 Response(std::size_t max_response_streambuf_size)101 Response(std::size_t max_response_streambuf_size) noexcept : streambuf(max_response_streambuf_size), content(streambuf) {} 102 103 public: 104 std::string http_version, status_code; 105 106 Content content; 107 108 CaseInsensitiveMultimap header; 109 }; 110 111 class Config { 112 friend class ClientBase<socket_type>; 113 114 private: Config()115 Config() noexcept {} 116 117 public: 118 /// Set timeout on requests in seconds. Default value: 0 (no timeout). 119 long timeout = 0; 120 /// Set connect timeout in seconds. Default value: 0 (Config::timeout is then used instead). 121 long timeout_connect = 0; 122 /// Maximum size of response stream buffer. Defaults to architecture maximum. 123 /// Reaching this limit will result in a message_size error code. 124 std::size_t max_response_streambuf_size = std::numeric_limits<std::size_t>::max(); 125 /// Set proxy server (server:port) 126 std::string proxy_server; 127 }; 128 129 protected: 130 class Connection : public std::enable_shared_from_this<Connection> { 131 public: 132 template <typename... Args> Connection(std::shared_ptr<ScopeRunner> handler_runner_,long timeout,Args &&...args)133 Connection(std::shared_ptr<ScopeRunner> handler_runner_, long timeout, Args &&... args) noexcept 134 : handler_runner(std::move(handler_runner_)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {} 135 136 std::shared_ptr<ScopeRunner> handler_runner; 137 long timeout; 138 139 std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable 140 bool in_use = false; 141 bool event_stream = false; 142 bool attempt_reconnect = true; 143 144 std::unique_ptr<asio::steady_timer> timer; 145 close()146 void close() noexcept { 147 error_code ec; 148 socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); 149 socket->lowest_layer().cancel(ec); 150 } 151 set_timeout(long seconds=0)152 void set_timeout(long seconds = 0) noexcept { 153 if(seconds == 0) 154 seconds = timeout; 155 if(seconds == 0) { 156 timer = nullptr; 157 return; 158 } 159 timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(get_socket_executor(*socket), std::chrono::seconds(seconds))); 160 std::weak_ptr<Connection> self_weak(this->shared_from_this()); // To avoid keeping Connection instance alive longer than needed 161 timer->async_wait([self_weak](const error_code &ec) { 162 if(!ec) { 163 if(auto self = self_weak.lock()) 164 self->close(); 165 } 166 }); 167 } 168 cancel_timeout()169 void cancel_timeout() noexcept { 170 if(timer) { 171 try { 172 timer->cancel(); 173 } 174 catch(...) { 175 } 176 } 177 } 178 }; 179 180 class Session { 181 public: Session(std::size_t max_response_streambuf_size,std::shared_ptr<Connection> connection_,std::unique_ptr<asio::streambuf> request_streambuf_)182 Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection_, std::unique_ptr<asio::streambuf> request_streambuf_) noexcept 183 : connection(std::move(connection_)), request_streambuf(std::move(request_streambuf_)), response(new Response(max_response_streambuf_size)) {} 184 185 std::shared_ptr<Connection> connection; 186 std::unique_ptr<asio::streambuf> request_streambuf; 187 std::shared_ptr<Response> response; 188 std::function<void(const error_code &)> callback; 189 }; 190 191 public: 192 /// Set before calling a request function. 193 Config config; 194 195 /// If you want to reuse an already created asio::io_service, store its pointer here before calling a request function. 196 std::shared_ptr<io_context> io_service; 197 198 /// Convenience function to perform synchronous request. The io_service is run within this function. 199 /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. 200 /// Do not use concurrently with the asynchronous request functions. 201 /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. request(const std::string & method,const std::string & path={"/"},string_view content={},const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())202 std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 203 std::shared_ptr<Response> response; 204 error_code ec; __anon1e6285bd0202(std::shared_ptr<Response> response_, const error_code &ec_) 205 request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) { 206 response = response_; 207 ec = ec_; 208 }); 209 210 { 211 LockGuard lock(concurrent_synchronous_requests_mutex); 212 ++concurrent_synchronous_requests; 213 } 214 io_service->run(); 215 { 216 LockGuard lock(concurrent_synchronous_requests_mutex); 217 --concurrent_synchronous_requests; 218 if(!concurrent_synchronous_requests) 219 restart(*io_service); 220 } 221 222 if(ec) 223 throw system_error(ec); 224 225 return response; 226 } 227 228 /// Convenience function to perform synchronous request. The io_service is run within this function. 229 /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. 230 /// Do not use concurrently with the asynchronous request functions. 231 /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())232 std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 233 std::shared_ptr<Response> response; 234 error_code ec; 235 request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) { 236 response = response_; 237 ec = ec_; 238 }); 239 240 { 241 LockGuard lock(concurrent_synchronous_requests_mutex); 242 ++concurrent_synchronous_requests; 243 } 244 io_service->run(); 245 { 246 LockGuard lock(concurrent_synchronous_requests_mutex); 247 --concurrent_synchronous_requests; 248 if(!concurrent_synchronous_requests) 249 restart(*io_service); 250 } 251 252 if(ec) 253 throw system_error(ec); 254 255 return response; 256 } 257 258 /// Asynchronous request where running Client's io_service is required. 259 /// Do not use concurrently with the synchronous request functions. 260 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,string_view content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)261 void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header, 262 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 263 auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); 264 std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed 265 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); 266 session->callback = [this, session_weak, request_callback](const error_code &ec) { 267 if(auto session = session_weak.lock()) { 268 { 269 LockGuard lock(this->connections_mutex); 270 if(!session->connection->event_stream) 271 session->connection->in_use = false; 272 273 // Remove unused connections, but keep one open for HTTP persistent connection: 274 std::size_t unused_connections = 0; 275 for(auto it = this->connections.begin(); it != this->connections.end();) { 276 if(ec && session->connection == *it) 277 it = this->connections.erase(it); 278 else if((*it)->in_use) 279 ++it; 280 else { 281 ++unused_connections; 282 if(unused_connections > 1) 283 it = this->connections.erase(it); 284 else 285 ++it; 286 } 287 } 288 } 289 290 if(*request_callback) 291 (*request_callback)(session->response, ec); 292 } 293 }; 294 295 std::ostream write_stream(session->request_streambuf.get()); 296 if(content.size() > 0) { 297 auto header_it = header.find("Content-Length"); 298 if(header_it == header.end()) { 299 header_it = header.find("Transfer-Encoding"); 300 if(header_it == header.end() || header_it->second != "chunked") 301 write_stream << "Content-Length: " << content.size() << "\r\n"; 302 } 303 } 304 write_stream << "\r\n"; 305 write_stream.write(content.data(), static_cast<std::streamsize>(content.size())); 306 307 connect(session); 308 } 309 310 /// Asynchronous request where running Client's io_service is required. 311 /// Do not use concurrently with the synchronous request functions. 312 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,string_view content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)313 void request(const std::string &method, const std::string &path, string_view content, 314 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 315 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_)); 316 } 317 318 /// Asynchronous request where running Client's io_service is required. 319 /// Do not use concurrently with the synchronous request functions. 320 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)321 void request(const std::string &method, const std::string &path, 322 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 323 request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback_)); 324 } 325 326 /// Asynchronous request where running Client's io_service is required. 327 /// Do not use concurrently with the synchronous request functions. 328 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)329 void request(const std::string &method, std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 330 request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback_)); 331 } 332 333 /// Asynchronous request where running Client's io_service is required. 334 /// Do not use concurrently with the synchronous request functions. 335 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)336 void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header, 337 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 338 auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); 339 std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed 340 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); 341 session->callback = [this, session_weak, request_callback](const error_code &ec) { 342 if(auto session = session_weak.lock()) { 343 { 344 LockGuard lock(this->connections_mutex); 345 if(!session->connection->event_stream) 346 session->connection->in_use = false; 347 348 // Remove unused connections, but keep one open for HTTP persistent connection: 349 std::size_t unused_connections = 0; 350 for(auto it = this->connections.begin(); it != this->connections.end();) { 351 if(ec && session->connection == *it) 352 it = this->connections.erase(it); 353 else if((*it)->in_use) 354 ++it; 355 else { 356 ++unused_connections; 357 if(unused_connections > 1) 358 it = this->connections.erase(it); 359 else 360 ++it; 361 } 362 } 363 } 364 365 if(*request_callback) 366 (*request_callback)(session->response, ec); 367 } 368 }; 369 370 content.seekg(0, std::ios::end); 371 auto content_length = content.tellg(); 372 content.seekg(0, std::ios::beg); 373 std::ostream write_stream(session->request_streambuf.get()); 374 if(content_length > 0) { 375 auto header_it = header.find("Content-Length"); 376 if(header_it == header.end()) { 377 header_it = header.find("Transfer-Encoding"); 378 if(header_it == header.end() || header_it->second != "chunked") 379 write_stream << "Content-Length: " << content_length << "\r\n"; 380 } 381 } 382 write_stream << "\r\n"; 383 if(content_length > 0) 384 write_stream << content.rdbuf(); 385 386 connect(session); 387 } 388 389 /// Asynchronous request where running Client's io_service is required. 390 /// Do not use concurrently with the synchronous request functions. 391 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::istream & content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)392 void request(const std::string &method, const std::string &path, std::istream &content, 393 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 394 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_)); 395 } 396 397 /// Close connections. stop()398 void stop() noexcept { 399 LockGuard lock(connections_mutex); 400 for(auto it = connections.begin(); it != connections.end();) { 401 (*it)->close(); 402 it = connections.erase(it); 403 } 404 } 405 ~ClientBase()406 virtual ~ClientBase() noexcept { 407 handler_runner->stop(); 408 stop(); 409 } 410 411 protected: 412 bool internal_io_service = false; 413 414 std::string host; 415 unsigned short port; 416 unsigned short default_port; 417 418 std::unique_ptr<std::pair<std::string, std::string>> host_port; 419 420 Mutex connections_mutex; 421 std::unordered_set<std::shared_ptr<Connection>> connections GUARDED_BY(connections_mutex); 422 423 std::shared_ptr<ScopeRunner> handler_runner; 424 425 Mutex concurrent_synchronous_requests_mutex; 426 std::size_t concurrent_synchronous_requests GUARDED_BY(concurrent_synchronous_requests_mutex) = 0; 427 ClientBase(const std::string & host_port,unsigned short default_port)428 ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) { 429 auto parsed_host_port = parse_host_port(host_port, default_port); 430 host = parsed_host_port.first; 431 port = parsed_host_port.second; 432 } 433 get_connection()434 std::shared_ptr<Connection> get_connection() noexcept { 435 std::shared_ptr<Connection> connection; 436 LockGuard lock(connections_mutex); 437 438 if(!io_service) { 439 io_service = std::make_shared<io_context>(); 440 internal_io_service = true; 441 } 442 443 for(auto it = connections.begin(); it != connections.end(); ++it) { 444 if(!(*it)->in_use) { 445 connection = *it; 446 break; 447 } 448 } 449 if(!connection) { 450 connection = create_connection(); 451 connections.emplace(connection); 452 } 453 connection->attempt_reconnect = true; 454 connection->in_use = true; 455 456 if(!host_port) { 457 if(config.proxy_server.empty()) 458 host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(host, std::to_string(port))); 459 else { 460 auto proxy_host_port = parse_host_port(config.proxy_server, 8080); 461 host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(proxy_host_port.first, std::to_string(proxy_host_port.second))); 462 } 463 } 464 465 return connection; 466 } 467 parse_host_port(const std::string & host_port,unsigned short default_port) const468 std::pair<std::string, unsigned short> parse_host_port(const std::string &host_port, unsigned short default_port) const noexcept { 469 std::pair<std::string, unsigned short> parsed_host_port; 470 std::size_t host_end = host_port.find(':'); 471 if(host_end == std::string::npos) { 472 parsed_host_port.first = host_port; 473 parsed_host_port.second = default_port; 474 } 475 else { 476 parsed_host_port.first = host_port.substr(0, host_end); 477 parsed_host_port.second = static_cast<unsigned short>(stoul(host_port.substr(host_end + 1))); 478 } 479 return parsed_host_port; 480 } 481 482 virtual std::shared_ptr<Connection> create_connection() noexcept = 0; 483 virtual void connect(const std::shared_ptr<Session> &) = 0; 484 create_request_header(const std::string & method,const std::string & path,const CaseInsensitiveMultimap & header) const485 std::unique_ptr<asio::streambuf> create_request_header(const std::string &method, const std::string &path, const CaseInsensitiveMultimap &header) const { 486 auto corrected_path = path; 487 if(corrected_path == "") 488 corrected_path = "/"; 489 if(!config.proxy_server.empty() && std::is_same<socket_type, asio::ip::tcp::socket>::value) 490 corrected_path = "http://" + host + ':' + std::to_string(port) + corrected_path; 491 492 std::unique_ptr<asio::streambuf> streambuf(new asio::streambuf()); 493 std::ostream write_stream(streambuf.get()); 494 write_stream << method << " " << corrected_path << " HTTP/1.1\r\n"; 495 write_stream << "Host: " << host; 496 if(port != default_port) 497 write_stream << ':' << std::to_string(port); 498 write_stream << "\r\n"; 499 for(auto &h : header) 500 write_stream << h.first << ": " << h.second << "\r\n"; 501 return streambuf; 502 } 503 write(const std::shared_ptr<Session> & session)504 void write(const std::shared_ptr<Session> &session) { 505 session->connection->set_timeout(); 506 asio::async_write(*session->connection->socket, session->request_streambuf->data(), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { 507 session->connection->cancel_timeout(); 508 auto lock = session->connection->handler_runner->continue_lock(); 509 if(!lock) 510 return; 511 if(!ec) 512 this->read(session); 513 else 514 session->callback(ec); 515 }); 516 } 517 read(const std::shared_ptr<Session> & session)518 void read(const std::shared_ptr<Session> &session) { 519 session->connection->set_timeout(); 520 asio::async_read_until(*session->connection->socket, session->response->streambuf, HeaderEndMatch(), [this, session](const error_code &ec, std::size_t bytes_transferred) { 521 session->connection->cancel_timeout(); 522 auto lock = session->connection->handler_runner->continue_lock(); 523 if(!lock) 524 return; 525 if(session->response->streambuf.size() == session->response->streambuf.max_size()) { 526 session->callback(make_error_code::make_error_code(errc::message_size)); 527 return; 528 } 529 530 if(!ec) { 531 session->connection->attempt_reconnect = true; 532 std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred; 533 534 if(!ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) { 535 session->callback(make_error_code::make_error_code(errc::protocol_error)); 536 return; 537 } 538 539 auto header_it = session->response->header.find("Content-Length"); 540 if(header_it != session->response->header.end()) { 541 auto content_length = stoull(header_it->second); 542 if(content_length > num_additional_bytes) { 543 session->connection->set_timeout(); 544 asio::async_read(*session->connection->socket, session->response->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [session](const error_code &ec, std::size_t /*bytes_transferred*/) { 545 session->connection->cancel_timeout(); 546 auto lock = session->connection->handler_runner->continue_lock(); 547 if(!lock) 548 return; 549 if(session->response->streambuf.size() == session->response->streambuf.max_size()) { 550 session->callback(make_error_code::make_error_code(errc::message_size)); 551 return; 552 } 553 554 if(!ec) 555 session->callback(ec); 556 else 557 session->callback(ec); 558 }); 559 } 560 else 561 session->callback(ec); 562 } 563 else if((header_it = session->response->header.find("Transfer-Encoding")) != session->response->header.end() && header_it->second == "chunked") { 564 auto chunks_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size); 565 566 // Copy leftover bytes 567 std::ostream ostream(chunks_streambuf.get()); 568 auto size = session->response->streambuf.size(); 569 std::unique_ptr<char[]> buffer(new char[size]); 570 session->response->content.read(buffer.get(), static_cast<std::streamsize>(size)); 571 ostream.write(buffer.get(), static_cast<std::streamsize>(size)); 572 573 this->read_chunked_transfer_encoded(session, chunks_streambuf); 574 } 575 else if(session->response->http_version < "1.1" || ((header_it = session->response->header.find("Session")) != session->response->header.end() && header_it->second == "close")) { 576 session->connection->set_timeout(); 577 asio::async_read(*session->connection->socket, session->response->streambuf, [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { 578 session->connection->cancel_timeout(); 579 auto lock = session->connection->handler_runner->continue_lock(); 580 if(!lock) 581 return; 582 if(session->response->streambuf.size() == session->response->streambuf.max_size()) { 583 session->callback(make_error_code::make_error_code(errc::message_size)); 584 return; 585 } 586 587 if(!ec) { 588 { 589 LockGuard lock(this->connections_mutex); 590 this->connections.erase(session->connection); 591 } 592 session->callback(ec); 593 } 594 else 595 session->callback(ec == error::eof ? error_code() : ec); 596 }); 597 } 598 else if(((header_it = session->response->header.find("Content-Type")) != session->response->header.end() && header_it->second == "text/event-stream")) { 599 session->connection->event_stream = true; 600 601 auto events_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size); 602 603 // Copy leftover bytes 604 std::ostream ostream(events_streambuf.get()); 605 auto size = session->response->streambuf.size(); 606 std::unique_ptr<char[]> buffer(new char[size]); 607 session->response->content.read(buffer.get(), static_cast<std::streamsize>(size)); 608 ostream.write(buffer.get(), static_cast<std::streamsize>(size)); 609 610 session->callback(ec); // Connection to a Server-Sent Events resource is opened 611 612 this->read_server_sent_event(session, events_streambuf); 613 } 614 else 615 session->callback(ec); 616 } 617 else { 618 if(session->connection->attempt_reconnect && ec != error::operation_aborted) { 619 LockGuard lock(connections_mutex); 620 auto it = connections.find(session->connection); 621 if(it != connections.end()) { 622 connections.erase(it); 623 session->connection = create_connection(); 624 session->connection->attempt_reconnect = false; 625 session->connection->in_use = true; 626 session->response = std::shared_ptr<Response>(new Response(this->config.max_response_streambuf_size)); 627 connections.emplace(session->connection); 628 lock.unlock(); 629 this->connect(session); 630 } 631 else { 632 lock.unlock(); 633 session->callback(ec); 634 } 635 } 636 else 637 session->callback(ec); 638 } 639 }); 640 } 641 read_chunked_transfer_encoded(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunks_streambuf)642 void read_chunked_transfer_encoded(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunks_streambuf) { 643 session->connection->set_timeout(); 644 asio::async_read_until(*session->connection->socket, *chunks_streambuf, "\r\n", [this, session, chunks_streambuf](const error_code &ec, size_t bytes_transferred) { 645 session->connection->cancel_timeout(); 646 auto lock = session->connection->handler_runner->continue_lock(); 647 if(!lock) 648 return; 649 if(chunks_streambuf->size() == chunks_streambuf->max_size()) { 650 session->callback(make_error_code::make_error_code(errc::message_size)); 651 return; 652 } 653 654 if(!ec) { 655 std::istream istream(chunks_streambuf.get()); 656 std::string line; 657 getline(istream, line); 658 bytes_transferred -= line.size() + 1; 659 line.pop_back(); 660 unsigned long length = 0; 661 try { 662 length = stoul(line, 0, 16); 663 } 664 catch(...) { 665 session->callback(make_error_code::make_error_code(errc::protocol_error)); 666 return; 667 } 668 669 auto num_additional_bytes = chunks_streambuf->size() - bytes_transferred; 670 671 if((2 + length) > num_additional_bytes) { 672 session->connection->set_timeout(); 673 asio::async_read(*session->connection->socket, *chunks_streambuf, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, chunks_streambuf, length](const error_code &ec, size_t /*bytes_transferred*/) { 674 session->connection->cancel_timeout(); 675 auto lock = session->connection->handler_runner->continue_lock(); 676 if(!lock) 677 return; 678 if(chunks_streambuf->size() == chunks_streambuf->max_size()) { 679 session->callback(make_error_code::make_error_code(errc::message_size)); 680 return; 681 } 682 683 if(!ec) 684 this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length); 685 else 686 session->callback(ec); 687 }); 688 } 689 else 690 this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length); 691 } 692 else 693 session->callback(ec); 694 }); 695 } 696 read_chunked_transfer_encoded_chunk(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunks_streambuf,unsigned long length)697 void read_chunked_transfer_encoded_chunk(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunks_streambuf, unsigned long length) { 698 std::istream istream(chunks_streambuf.get()); 699 if(length > 0) { 700 std::ostream ostream(&session->response->streambuf); 701 std::unique_ptr<char[]> buffer(new char[length]); 702 istream.read(buffer.get(), static_cast<std::streamsize>(length)); 703 ostream.write(buffer.get(), static_cast<std::streamsize>(length)); 704 if(session->response->streambuf.size() == session->response->streambuf.max_size()) { 705 session->callback(make_error_code::make_error_code(errc::message_size)); 706 return; 707 } 708 } 709 710 // Remove "\r\n" 711 istream.get(); 712 istream.get(); 713 714 if(length > 0) 715 read_chunked_transfer_encoded(session, chunks_streambuf); 716 else 717 session->callback(error_code()); 718 } 719 read_server_sent_event(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & events_streambuf)720 void read_server_sent_event(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &events_streambuf) { 721 session->connection->set_timeout(); 722 asio::async_read_until(*session->connection->socket, *events_streambuf, HeaderEndMatch(), [this, session, events_streambuf](const error_code &ec, std::size_t /*bytes_transferred*/) { 723 session->connection->cancel_timeout(); 724 auto lock = session->connection->handler_runner->continue_lock(); 725 if(!lock) 726 return; 727 if(events_streambuf->size() == events_streambuf->max_size()) { 728 session->callback(make_error_code::make_error_code(errc::message_size)); 729 return; 730 } 731 732 if(!ec) { 733 std::istream istream(events_streambuf.get()); 734 std::ostream ostream(&session->response->streambuf); 735 std::string line; 736 while(std::getline(istream, line) && !line.empty() && !(line.back() == '\r' && line.size() == 1)) { 737 ostream.write(line.data(), static_cast<std::streamsize>(line.size() - (line.back() == '\r' ? 1 : 0))); 738 ostream.put('\n'); 739 } 740 741 session->callback(ec); 742 read_server_sent_event(session, events_streambuf); 743 } 744 else 745 session->callback(ec); 746 }); 747 } 748 }; 749 750 template <class socket_type> 751 class Client : public ClientBase<socket_type> {}; 752 753 using HTTP = asio::ip::tcp::socket; 754 755 template <> 756 class Client<HTTP> : public ClientBase<HTTP> { 757 public: 758 /** 759 * Constructs a client object. 760 * 761 * @param server_port_path Server resource given by host[:port][/path] 762 */ Client(const std::string & server_port_path)763 Client(const std::string &server_port_path) noexcept : ClientBase<HTTP>::ClientBase(server_port_path, 80) {} 764 765 protected: create_connection()766 std::shared_ptr<Connection> create_connection() noexcept override { 767 return std::make_shared<Connection>(handler_runner, config.timeout, *io_service); 768 } 769 connect(const std::shared_ptr<Session> & session)770 void connect(const std::shared_ptr<Session> &session) override { 771 if(!session->connection->socket->lowest_layer().is_open()) { 772 auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service); 773 session->connection->set_timeout(config.timeout_connect); 774 async_resolve(*resolver, *host_port, [this, session, resolver](const error_code &ec, resolver_results results) { 775 session->connection->cancel_timeout(); 776 auto lock = session->connection->handler_runner->continue_lock(); 777 if(!lock) 778 return; 779 if(!ec) { 780 session->connection->set_timeout(config.timeout_connect); 781 asio::async_connect(*session->connection->socket, results, [this, session, resolver](const error_code &ec, async_connect_endpoint /*endpoint*/) { 782 session->connection->cancel_timeout(); 783 auto lock = session->connection->handler_runner->continue_lock(); 784 if(!lock) 785 return; 786 if(!ec) { 787 asio::ip::tcp::no_delay option(true); 788 error_code ec; 789 session->connection->socket->set_option(option, ec); 790 this->write(session); 791 } 792 else 793 session->callback(ec); 794 }); 795 } 796 else 797 session->callback(ec); 798 }); 799 } 800 else 801 write(session); 802 } 803 }; 804 } // namespace SimpleWeb 805 806 #endif /* SIMPLE_WEB_CLIENT_HTTP_HPP */ 807