1 /*
2 * Copyright (C) 2008 Emweb bv, Herent, Belgium.
3 *
4 * All rights reserved.
5 */
6 //
7 // connection.cpp
8 // ~~~~~~~~~~~~~~
9 //
10 // Copyright (c) 2003-2006 Christopher M. Kohlhoff (chris at kohlhoff dot com)
11 //
12 // Distributed under the Boost Software License, Version 1.0. (See accompanying
13 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
14 //
15
16 #define DEBUG
17
18 #include <vector>
19
20 #include "Connection.h"
21 #include "ConnectionManager.h"
22 #include "RequestHandler.h"
23 #include "StockReply.h"
24 #include "Server.h"
25 #include "WebController.h"
26
27 namespace Wt {
28 LOGGER("wthttp/async");
29 }
30
31 #if BOOST_VERSION >= 104900
32 typedef std::chrono::seconds asio_timer_seconds;
33 #else
34 typedef boost::posix_time::seconds asio_timer_seconds;
35 #endif
36
37 namespace http {
38 namespace server {
39
40 static const int CONNECTION_TIMEOUT = 300; // 5 minutes
41 static const int BODY_TIMEOUT = 600; // 10 minutes
42 static const int KEEPALIVE_TIMEOUT = 10; // 10 seconds
43
Connection(asio::io_service & io_service,Server * server,ConnectionManager & manager,RequestHandler & handler)44 Connection::Connection(asio::io_service& io_service, Server *server,
45 ConnectionManager& manager, RequestHandler& handler)
46 : ConnectionManager_(manager),
47 strand_(io_service),
48 state_(Idle),
49 request_handler_(handler),
50 readTimer_(io_service),
51 writeTimer_(io_service),
52 request_parser_(server),
53 server_(server),
54 waitingResponse_(false),
55 haveResponse_(false),
56 responseDone_(false)
57 { }
58
~Connection()59 Connection::~Connection()
60 {
61 LOG_DEBUG("~Connection");
62 }
63
64 #if (defined(WT_ASIO_IS_BOOST_ASIO) && BOOST_VERSION >= 106600) || (defined(WT_ASIO_IS_STANDALONE_ASIO) && ASIO_VERSION >= 101100)
native()65 asio::ip::tcp::socket::native_handle_type Connection::native()
66 {
67 return socket().native_handle();
68 }
69 #else
native()70 asio::ip::tcp::socket::native_type Connection::native()
71 {
72 return socket().native();
73 }
74 #endif
75
finishReply()76 void Connection::finishReply()
77 {
78 if (!request_.uri.empty()) {
79 LOG_DEBUG("last request: " << request_.method.str()
80 << " " << request_.uri.str()
81 << " (ws:" << request_.webSocketVersion << ")");
82 }
83 }
84
scheduleStop()85 void Connection::scheduleStop()
86 {
87 server_->service()
88 .post(strand_.wrap(std::bind(&Connection::stop, shared_from_this())));
89 }
90
start()91 void Connection::start()
92 {
93 LOG_DEBUG(native() << ": start()");
94
95 request_parser_.reset();
96 request_.reset();
97 try {
98 request_.remoteIP = socket().remote_endpoint().address().to_string();
99 request_.port = socket().local_endpoint().port();
100 } catch (std::exception& e) {
101 LOG_ERROR("remote_endpoint() threw: " << e.what());
102 }
103
104 Wt::AsioWrapper::error_code ignored_ec;
105 socket().set_option(asio::ip::tcp::no_delay(true), ignored_ec);
106
107 rcv_buffers_.push_back(Buffer());
108 startAsyncReadRequest(rcv_buffers_.back(), CONNECTION_TIMEOUT);
109 }
110
stop()111 void Connection::stop()
112 {
113 lastWtReply_.reset();
114 lastProxyReply_.reset();
115 lastStaticReply_.reset();
116 }
117
setReadTimeout(int seconds)118 void Connection::setReadTimeout(int seconds)
119 {
120 if (seconds != 0) {
121 LOG_DEBUG(native() << " setting read timeout (ws: "
122 << request_.webSocketVersion << ")");
123 state_ |= Reading;
124
125 readTimer_.expires_from_now(asio_timer_seconds(seconds));
126 readTimer_.async_wait(std::bind(&Connection::timeout, shared_from_this(),
127 std::placeholders::_1));
128 }
129 }
130
setWriteTimeout(int seconds)131 void Connection::setWriteTimeout(int seconds)
132 {
133 LOG_DEBUG(native() << " setting write timeout (ws: "
134 << request_.webSocketVersion << ")");
135 state_ |= Writing;
136
137 writeTimer_.expires_from_now(asio_timer_seconds(seconds));
138 writeTimer_.async_wait(std::bind(&Connection::timeout, shared_from_this(),
139 std::placeholders::_1));
140 }
141
cancelReadTimer()142 void Connection::cancelReadTimer()
143 {
144 LOG_DEBUG(native() << " cancel read timeout");
145 state_.clear(Reading);
146
147 readTimer_.cancel();
148 }
149
cancelWriteTimer()150 void Connection::cancelWriteTimer()
151 {
152 LOG_DEBUG(native() << " cancel write timeout");
153 state_.clear(Writing);
154
155 writeTimer_.cancel();
156 }
157
timeout(const Wt::AsioWrapper::error_code & e)158 void Connection::timeout(const Wt::AsioWrapper::error_code& e)
159 {
160 if (e != asio::error::operation_aborted)
161 strand_.post(std::bind(&Connection::doTimeout, shared_from_this()));
162 }
163
doTimeout()164 void Connection::doTimeout()
165 {
166 Wt::AsioWrapper::error_code ignored_ec;
167 socket().shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
168 readTimer_.cancel();
169 writeTimer_.cancel();
170 }
171
handleReadRequest0()172 void Connection::handleReadRequest0()
173 {
174 Buffer& buffer = rcv_buffers_.back();
175
176 #ifdef DEBUG
177 try {
178 LOG_DEBUG(socket().native_handle() << "incoming request: "
179 << socket().remote_endpoint().port() << " (avail= "
180 << (rcv_buffer_size_ - (rcv_remaining_ - buffer.data())) << "): "
181 << std::string(rcv_remaining_,
182 std::min((unsigned long)(buffer.data()
183 - rcv_remaining_ + rcv_buffer_size_),
184 (long unsigned)1000)));
185 } catch (...) {
186 }
187 #endif // DEBUG
188
189 boost::tribool result;
190 boost::tie(result, rcv_remaining_)
191 = request_parser_.parse(request_,
192 &*rcv_remaining_, buffer.data() + rcv_buffer_size_);
193
194 if (result) {
195 Reply::status_type status = request_parser_.validate(request_);
196 // FIXME: Let the reply decide whether we're doing websockets, move this logic to WtReply
197 bool doWebSockets = server_->controller()->configuration().webSockets() &&
198 (server_->controller()->configuration().sessionPolicy() != Wt::Configuration::DedicatedProcess ||
199 server_->configuration().parentPort() != -1);
200
201 if (doWebSockets)
202 request_.enableWebSocket();
203
204 LOG_DEBUG(native() << "request: " << status);
205
206 if (status >= 300)
207 sendStockReply(status);
208 else {
209 if (request_.webSocketVersion >= 0) {
210 // replace 'http' with 'ws'
211 request_.urlScheme[0] = 'w';
212 request_.urlScheme[1] = 's';
213 strncpy(request_.urlScheme + 2, urlScheme() + 4, 7);
214 request_.urlScheme[9] = 0;
215 } else
216 strncpy(request_.urlScheme, urlScheme(), 9);
217
218 ReplyPtr reply;
219 try {
220 reply = request_handler_.handleRequest
221 (request_, lastWtReply_, lastProxyReply_, lastStaticReply_);
222 reply->setConnection(shared_from_this());
223 } catch (Wt::AsioWrapper::system_error& e) {
224 LOG_ERROR("Error in handleRequest0(): " << e.what());
225 handleError(e.code());
226 return;
227 }
228
229 rcv_body_buffer_ = false;
230 handleReadBody(reply);
231 }
232 } else if (!result) {
233 sendStockReply(StockReply::bad_request);
234 } else {
235 rcv_buffers_.push_back(Buffer());
236 startAsyncReadRequest(rcv_buffers_.back(),
237 request_parser_.initialState()
238 ? KEEPALIVE_TIMEOUT
239 : CONNECTION_TIMEOUT);
240 }
241 }
242
sendStockReply(StockReply::status_type status)243 void Connection::sendStockReply(StockReply::status_type status)
244 {
245 ReplyPtr reply
246 (new StockReply(request_, status, "", server_->configuration()));
247
248 reply->setConnection(shared_from_this());
249 reply->setCloseConnection();
250
251 startWriteResponse(reply);
252 }
253
handleReadRequest(const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)254 void Connection::handleReadRequest(const Wt::AsioWrapper::error_code& e,
255 std::size_t bytes_transferred)
256 {
257 LOG_DEBUG(native() << ": handleReadRequest(): " << e.message());
258
259 cancelReadTimer();
260
261 if (!e) {
262 rcv_remaining_ = rcv_buffers_.back().data();
263 rcv_buffer_size_ = bytes_transferred;
264 handleReadRequest0();
265 } else if (e != asio::error::operation_aborted &&
266 e != asio::error::bad_descriptor) {
267 handleError(e);
268 }
269 }
270
close()271 void Connection::close()
272 {
273 cancelReadTimer();
274 cancelWriteTimer();
275
276 LOG_DEBUG(native() << ": close()");
277
278 ConnectionManager_.stop(shared_from_this());
279 }
280
closed()281 bool Connection::closed() const
282 {
283 Connection *self = const_cast<Connection *>(this);
284 return !self->socket().is_open();
285 }
286
handleError(const Wt::AsioWrapper::error_code & e)287 void Connection::handleError(const Wt::AsioWrapper::error_code& e)
288 {
289 LOG_DEBUG(native() << ": error: " << e.message());
290
291 close();
292 }
293
handleReadBody(ReplyPtr reply)294 void Connection::handleReadBody(ReplyPtr reply)
295 {
296 if (request_.type != Request::WebSocket) {
297 /*
298 * For a WebSocket: reading and writing may happen in parallel,
299 * And writing and reading is asynchronous (post() from within
300 * WtReply::consumeWebSocketMessage()
301 */
302 haveResponse_ = false;
303 waitingResponse_ = true;
304 }
305
306 RequestParser::ParseResult result = request_parser_
307 .parseBody(request_, reply, rcv_remaining_,
308 rcv_buffers_.back().data() + rcv_buffer_size_);
309
310 if (request_.type != Request::WebSocket)
311 waitingResponse_ = false;
312
313 if (result == RequestParser::ReadMore) {
314 readMore(reply, BODY_TIMEOUT);
315 } else if (result == RequestParser::Done && haveResponse_)
316 startWriteResponse(reply);
317 }
318
readMore(ReplyPtr reply,int timeout)319 void Connection::readMore(ReplyPtr reply, int timeout)
320 {
321 if (!rcv_body_buffer_) {
322 rcv_body_buffer_ = true;
323 rcv_buffers_.push_back(Buffer());
324 }
325 startAsyncReadBody(reply, rcv_buffers_.back(), timeout);
326 }
327
readAvailable()328 bool Connection::readAvailable()
329 {
330 try {
331 return (rcv_remaining_ < rcv_buffers_.back().data() + rcv_buffer_size_)
332 || socket().available();
333 } catch (Wt::AsioWrapper::system_error& e) {
334 return false; // socket(): bad file descriptor
335 }
336 }
337
detectDisconnect(ReplyPtr reply,const std::function<void ()> & callback)338 void Connection::detectDisconnect(ReplyPtr reply,
339 const std::function<void()>& callback)
340 {
341 server_->service()
342 .post(strand_.wrap(std::bind(&Connection::asyncDetectDisconnect, this, reply, callback)));
343 }
344
asyncDetectDisconnect(ReplyPtr reply,const std::function<void ()> & callback)345 void Connection::asyncDetectDisconnect(ReplyPtr reply,
346 const std::function<void()>& callback)
347 {
348 if (disconnectCallback_)
349 return; // We're already detecting the disconnect
350
351 disconnectCallback_ = callback;
352
353 /*
354 * We do not actually expect to receive anything, and if we do, we'll close
355 * anyway (see below).
356 */
357 startAsyncReadBody(reply, rcv_buffers_.back(), 0);
358 }
359
handleReadBody0(ReplyPtr reply,const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)360 void Connection::handleReadBody0(ReplyPtr reply,
361 const Wt::AsioWrapper::error_code& e,
362 std::size_t bytes_transferred)
363 {
364 LOG_DEBUG(native() << ": handleReadBody0(): " << e.message());
365
366 if (disconnectCallback_) {
367 if (e && e != asio::error::operation_aborted) {
368 boost::function<void()> f = disconnectCallback_;
369 disconnectCallback_ = boost::function<void()>();
370 f();
371 } else if (!e) {
372 LOG_ERROR(native()
373 << ": handleReadBody(): while waiting for disconnect, "
374 "received unexpected data, closing");
375 close();
376 }
377
378 return;
379 }
380
381 cancelReadTimer();
382
383 if (!e) {
384 rcv_remaining_ = rcv_buffers_.back().data();
385 rcv_buffer_size_ = bytes_transferred;
386 handleReadBody(reply);
387 } else if (e != asio::error::operation_aborted
388 && e != asio::error::bad_descriptor) {
389 reply->consumeData(rcv_remaining_, rcv_remaining_, Request::Error);
390 handleError(e);
391 }
392 }
393
startWriteResponse(ReplyPtr reply)394 void Connection::startWriteResponse(ReplyPtr reply)
395 {
396 haveResponse_ = false;
397
398 if (disconnectCallback_)
399 socket().cancel();
400
401 if (state_ & Writing) {
402 LOG_ERROR("Connection::startWriteResponse(): connection already writing");
403 close();
404 server_->service()
405 .post(strand_.wrap(std::bind(&Reply::writeDone, reply, false)));
406 return;
407 }
408
409 std::vector<asio::const_buffer> buffers;
410 responseDone_ = reply->nextBuffers(buffers);
411
412 unsigned s = 0;
413 #ifdef DEBUG
414 for (unsigned i = 0; i < buffers.size(); ++i) {
415 int size = asio::buffer_size(buffers[i]);
416 s += size;
417 #ifdef DEBUG_DUMP
418 char *data = (char *)asio::detail::buffer_cast_helper(buffers[i]);
419 for (int j = 0; j < size; ++j)
420 std::cerr << data[j];
421 #endif
422 }
423 #endif
424
425 LOG_DEBUG(native() << " sending: " << s << "(buffers: "
426 << buffers.size() << ")");
427
428 if (!buffers.empty()) {
429 startAsyncWriteResponse(reply, buffers, BODY_TIMEOUT);
430 } else {
431 cancelWriteTimer();
432 handleWriteResponse(reply);
433 }
434 }
435
handleWriteResponse(ReplyPtr reply)436 void Connection::handleWriteResponse(ReplyPtr reply)
437 {
438 LOG_DEBUG(native() << ": handleWriteResponse() " <<
439 haveResponse_ << " " << responseDone_);
440 if (haveResponse_)
441 startWriteResponse(reply);
442 else {
443 if (!responseDone_) {
444 /*
445 * Keep reply open and wait for more data.
446 */
447 } else {
448 reply->logReply(request_handler_.logger());
449
450 if (reply->closeConnection())
451 ConnectionManager_.stop(shared_from_this());
452 else {
453 request_parser_.reset();
454 request_.reset();
455 responseDone_ = false;
456
457 while (rcv_buffers_.size() > 1)
458 rcv_buffers_.pop_front();
459
460 if (rcv_remaining_ < rcv_buffers_.back().data() + rcv_buffer_size_)
461 handleReadRequest0();
462 else
463 startAsyncReadRequest(rcv_buffers_.back(), KEEPALIVE_TIMEOUT);
464 }
465 }
466 }
467 }
468
handleWriteResponse0(ReplyPtr reply,const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)469 void Connection::handleWriteResponse0(ReplyPtr reply,
470 const Wt::AsioWrapper::error_code& e,
471 std::size_t bytes_transferred)
472 {
473 LOG_DEBUG(native() << ": handleWriteResponse0(): "
474 << bytes_transferred << " ; " << e.message());
475
476 cancelWriteTimer();
477
478 haveResponse_ = false;
479 waitingResponse_ = true;
480 reply->writeDone(!e);
481 waitingResponse_ = false;
482
483 if (!e) {
484 handleWriteResponse(reply);
485 } else {
486 if (e != asio::error::operation_aborted)
487 handleError(e);
488 }
489 }
490
491 } // namespace server
492 } // namespace http
493