1 /*
2  * Copyright (C) 2008 Emweb bv, Herent, Belgium.
3  *
4  * All rights reserved.
5  */
6 //
7 // connection.cpp
8 // ~~~~~~~~~~~~~~
9 //
10 // Copyright (c) 2003-2006 Christopher M. Kohlhoff (chris at kohlhoff dot com)
11 //
12 // Distributed under the Boost Software License, Version 1.0. (See accompanying
13 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
14 //
15 
16 #define DEBUG
17 
18 #include <vector>
19 
20 #include "Connection.h"
21 #include "ConnectionManager.h"
22 #include "RequestHandler.h"
23 #include "StockReply.h"
24 #include "Server.h"
25 #include "WebController.h"
26 
27 namespace Wt {
28   LOGGER("wthttp/async");
29 }
30 
31 #if BOOST_VERSION >= 104900
32 typedef std::chrono::seconds asio_timer_seconds;
33 #else
34 typedef boost::posix_time::seconds asio_timer_seconds;
35 #endif
36 
37 namespace http {
38 namespace server {
39 
40 static const int CONNECTION_TIMEOUT = 300; // 5 minutes
41 static const int BODY_TIMEOUT = 600;       // 10 minutes
42 static const int KEEPALIVE_TIMEOUT  = 10;  // 10 seconds
43 
Connection(asio::io_service & io_service,Server * server,ConnectionManager & manager,RequestHandler & handler)44 Connection::Connection(asio::io_service& io_service, Server *server,
45     ConnectionManager& manager, RequestHandler& handler)
46   : ConnectionManager_(manager),
47     strand_(io_service),
48     state_(Idle),
49     request_handler_(handler),
50     readTimer_(io_service),
51     writeTimer_(io_service),
52     request_parser_(server),
53     server_(server),
54     waitingResponse_(false),
55     haveResponse_(false),
56     responseDone_(false)
57 { }
58 
~Connection()59 Connection::~Connection()
60 {
61   LOG_DEBUG("~Connection");
62 }
63 
64 #if (defined(WT_ASIO_IS_BOOST_ASIO) && BOOST_VERSION >= 106600) || (defined(WT_ASIO_IS_STANDALONE_ASIO) && ASIO_VERSION >= 101100)
native()65 asio::ip::tcp::socket::native_handle_type Connection::native()
66 {
67   return socket().native_handle();
68 }
69 #else
native()70 asio::ip::tcp::socket::native_type Connection::native()
71 {
72   return socket().native();
73 }
74 #endif
75 
finishReply()76 void Connection::finishReply()
77 {
78   if (!request_.uri.empty()) {
79     LOG_DEBUG("last request: " << request_.method.str()
80 	      << " " << request_.uri.str()
81 	      << " (ws:" << request_.webSocketVersion << ")");
82   }
83 }
84 
scheduleStop()85 void Connection::scheduleStop()
86 {
87   server_->service()
88     .post(strand_.wrap(std::bind(&Connection::stop, shared_from_this())));
89 }
90 
start()91 void Connection::start()
92 {
93   LOG_DEBUG(native() << ": start()");
94 
95   request_parser_.reset();
96   request_.reset();
97   try {
98     request_.remoteIP = socket().remote_endpoint().address().to_string();
99     request_.port = socket().local_endpoint().port();
100   } catch (std::exception& e) {
101     LOG_ERROR("remote_endpoint() threw: " << e.what());
102   }
103 
104   Wt::AsioWrapper::error_code ignored_ec;
105   socket().set_option(asio::ip::tcp::no_delay(true), ignored_ec);
106 
107   rcv_buffers_.push_back(Buffer());
108   startAsyncReadRequest(rcv_buffers_.back(), CONNECTION_TIMEOUT);
109 }
110 
stop()111 void Connection::stop()
112 {
113   lastWtReply_.reset();
114   lastProxyReply_.reset();
115   lastStaticReply_.reset();
116 }
117 
setReadTimeout(int seconds)118 void Connection::setReadTimeout(int seconds)
119 {
120   if (seconds != 0) {
121     LOG_DEBUG(native() << " setting read timeout (ws: "
122 	      << request_.webSocketVersion << ")");
123     state_ |= Reading;
124 
125     readTimer_.expires_from_now(asio_timer_seconds(seconds));
126     readTimer_.async_wait(std::bind(&Connection::timeout, shared_from_this(),
127 				    std::placeholders::_1));
128   }
129 }
130 
setWriteTimeout(int seconds)131 void Connection::setWriteTimeout(int seconds)
132 {
133   LOG_DEBUG(native() << " setting write timeout (ws: "
134 	    << request_.webSocketVersion << ")");
135   state_ |= Writing;
136 
137   writeTimer_.expires_from_now(asio_timer_seconds(seconds));
138   writeTimer_.async_wait(std::bind(&Connection::timeout, shared_from_this(),
139 				   std::placeholders::_1));
140 }
141 
cancelReadTimer()142 void Connection::cancelReadTimer()
143 {
144   LOG_DEBUG(native() << " cancel read timeout");
145   state_.clear(Reading);
146 
147   readTimer_.cancel();
148 }
149 
cancelWriteTimer()150 void Connection::cancelWriteTimer()
151 {
152   LOG_DEBUG(native() << " cancel write timeout");
153   state_.clear(Writing);
154 
155   writeTimer_.cancel();
156 }
157 
timeout(const Wt::AsioWrapper::error_code & e)158 void Connection::timeout(const Wt::AsioWrapper::error_code& e)
159 {
160   if (e != asio::error::operation_aborted)
161     strand_.post(std::bind(&Connection::doTimeout, shared_from_this()));
162 }
163 
doTimeout()164 void Connection::doTimeout()
165 {
166   Wt::AsioWrapper::error_code ignored_ec;
167   socket().shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
168   readTimer_.cancel();
169   writeTimer_.cancel();
170 }
171 
handleReadRequest0()172 void Connection::handleReadRequest0()
173 {
174   Buffer& buffer = rcv_buffers_.back();
175 
176 #ifdef DEBUG
177   try {
178     LOG_DEBUG(socket().native_handle() << "incoming request: "
179 	      << socket().remote_endpoint().port() << " (avail= "
180 	      << (rcv_buffer_size_ - (rcv_remaining_ - buffer.data())) << "): "
181 	      << std::string(rcv_remaining_,
182 			     std::min((unsigned long)(buffer.data()
183 				      - rcv_remaining_ + rcv_buffer_size_),
184 				      (long unsigned)1000)));
185   } catch (...) {
186   }
187 #endif // DEBUG
188 
189   boost::tribool result;
190   boost::tie(result, rcv_remaining_)
191     = request_parser_.parse(request_,
192 			    &*rcv_remaining_, buffer.data() + rcv_buffer_size_);
193 
194   if (result) {
195     Reply::status_type status = request_parser_.validate(request_);
196     // FIXME: Let the reply decide whether we're doing websockets, move this logic to WtReply
197     bool doWebSockets = server_->controller()->configuration().webSockets() &&
198 			(server_->controller()->configuration().sessionPolicy() != Wt::Configuration::DedicatedProcess ||
199 			 server_->configuration().parentPort() != -1);
200 
201     if (doWebSockets)
202       request_.enableWebSocket();
203 
204     LOG_DEBUG(native() << "request: " << status);
205 
206     if (status >= 300)
207       sendStockReply(status);
208     else {
209       if (request_.webSocketVersion >= 0) {
210 	// replace 'http' with 'ws'
211 	request_.urlScheme[0] = 'w';
212 	request_.urlScheme[1] = 's';
213 	strncpy(request_.urlScheme + 2, urlScheme() + 4, 7);
214 	request_.urlScheme[9] = 0;
215       } else
216         strncpy(request_.urlScheme, urlScheme(), 9);
217 
218       ReplyPtr reply;
219       try {
220 	reply = request_handler_.handleRequest
221 	  (request_, lastWtReply_, lastProxyReply_, lastStaticReply_);
222 	reply->setConnection(shared_from_this());
223       } catch (Wt::AsioWrapper::system_error& e) {
224 	LOG_ERROR("Error in handleRequest0(): " << e.what());
225 	handleError(e.code());
226 	return;
227       }
228 
229       rcv_body_buffer_ = false;
230       handleReadBody(reply);
231     }
232   } else if (!result) {
233     sendStockReply(StockReply::bad_request);
234   } else {
235     rcv_buffers_.push_back(Buffer());
236     startAsyncReadRequest(rcv_buffers_.back(),
237 			  request_parser_.initialState()
238 			  ? KEEPALIVE_TIMEOUT
239 			  : CONNECTION_TIMEOUT);
240   }
241 }
242 
sendStockReply(StockReply::status_type status)243 void Connection::sendStockReply(StockReply::status_type status)
244 {
245   ReplyPtr reply
246     (new StockReply(request_, status, "", server_->configuration()));
247 
248   reply->setConnection(shared_from_this());
249   reply->setCloseConnection();
250 
251   startWriteResponse(reply);
252 }
253 
handleReadRequest(const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)254 void Connection::handleReadRequest(const Wt::AsioWrapper::error_code& e,
255 				   std::size_t bytes_transferred)
256 {
257   LOG_DEBUG(native() << ": handleReadRequest(): " << e.message());
258 
259   cancelReadTimer();
260 
261   if (!e) {
262     rcv_remaining_ = rcv_buffers_.back().data();
263     rcv_buffer_size_ = bytes_transferred;
264     handleReadRequest0();
265   } else if (e != asio::error::operation_aborted &&
266 	     e != asio::error::bad_descriptor) {
267     handleError(e);
268   }
269 }
270 
close()271 void Connection::close()
272 {
273   cancelReadTimer();
274   cancelWriteTimer();
275 
276   LOG_DEBUG(native() << ": close()");
277 
278   ConnectionManager_.stop(shared_from_this());
279 }
280 
closed()281 bool Connection::closed() const
282 {
283   Connection *self = const_cast<Connection *>(this);
284   return !self->socket().is_open();
285 }
286 
handleError(const Wt::AsioWrapper::error_code & e)287 void Connection::handleError(const Wt::AsioWrapper::error_code& e)
288 {
289   LOG_DEBUG(native() << ": error: " << e.message());
290 
291   close();
292 }
293 
handleReadBody(ReplyPtr reply)294 void Connection::handleReadBody(ReplyPtr reply)
295 {
296   if (request_.type != Request::WebSocket) {
297     /*
298      * For a WebSocket: reading and writing may happen in parallel,
299      * And writing and reading is asynchronous (post() from within
300      * WtReply::consumeWebSocketMessage()
301      */
302     haveResponse_ = false;
303     waitingResponse_ = true;
304   }
305 
306   RequestParser::ParseResult result = request_parser_
307     .parseBody(request_, reply, rcv_remaining_,
308 	       rcv_buffers_.back().data() + rcv_buffer_size_);
309 
310   if (request_.type != Request::WebSocket)
311     waitingResponse_ = false;
312 
313   if (result == RequestParser::ReadMore) {
314     readMore(reply, BODY_TIMEOUT);
315   } else if (result == RequestParser::Done && haveResponse_)
316     startWriteResponse(reply);
317 }
318 
readMore(ReplyPtr reply,int timeout)319 void Connection::readMore(ReplyPtr reply, int timeout)
320 {
321   if (!rcv_body_buffer_) {
322     rcv_body_buffer_ = true;
323     rcv_buffers_.push_back(Buffer());
324   }
325   startAsyncReadBody(reply, rcv_buffers_.back(), timeout);
326 }
327 
readAvailable()328 bool Connection::readAvailable()
329 {
330   try {
331     return (rcv_remaining_ < rcv_buffers_.back().data() + rcv_buffer_size_)
332       || socket().available();
333   } catch (Wt::AsioWrapper::system_error& e) {
334     return false; // socket(): bad file descriptor
335   }
336 }
337 
detectDisconnect(ReplyPtr reply,const std::function<void ()> & callback)338 void Connection::detectDisconnect(ReplyPtr reply,
339 				  const std::function<void()>& callback)
340 {
341   server_->service()
342     .post(strand_.wrap(std::bind(&Connection::asyncDetectDisconnect, this, reply, callback)));
343 }
344 
asyncDetectDisconnect(ReplyPtr reply,const std::function<void ()> & callback)345 void Connection::asyncDetectDisconnect(ReplyPtr reply,
346 				       const std::function<void()>& callback)
347 {
348   if (disconnectCallback_)
349     return; // We're already detecting the disconnect
350 
351   disconnectCallback_ = callback;
352 
353   /*
354    * We do not actually expect to receive anything, and if we do, we'll close
355    * anyway (see below).
356    */
357   startAsyncReadBody(reply, rcv_buffers_.back(), 0);
358 }
359 
handleReadBody0(ReplyPtr reply,const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)360 void Connection::handleReadBody0(ReplyPtr reply,
361                                  const Wt::AsioWrapper::error_code& e,
362 				 std::size_t bytes_transferred)
363 {
364   LOG_DEBUG(native() << ": handleReadBody0(): " << e.message());
365 
366   if (disconnectCallback_) {
367     if (e && e != asio::error::operation_aborted) {
368       boost::function<void()> f = disconnectCallback_;
369       disconnectCallback_ = boost::function<void()>();
370       f();
371     } else if (!e) {
372       LOG_ERROR(native()
373 		<< ": handleReadBody(): while waiting for disconnect, "
374 		"received unexpected data, closing");
375       close();
376     }
377 
378     return;
379   }
380 
381   cancelReadTimer();
382 
383   if (!e) {
384     rcv_remaining_ = rcv_buffers_.back().data();
385     rcv_buffer_size_ = bytes_transferred;
386     handleReadBody(reply);
387   } else if (e != asio::error::operation_aborted
388 	     && e != asio::error::bad_descriptor) {
389     reply->consumeData(rcv_remaining_, rcv_remaining_, Request::Error);
390     handleError(e);
391   }
392 }
393 
startWriteResponse(ReplyPtr reply)394 void Connection::startWriteResponse(ReplyPtr reply)
395 {
396   haveResponse_ = false;
397 
398   if (disconnectCallback_)
399     socket().cancel();
400 
401   if (state_ & Writing) {
402     LOG_ERROR("Connection::startWriteResponse(): connection already writing");
403     close();
404     server_->service()
405       .post(strand_.wrap(std::bind(&Reply::writeDone, reply, false)));
406     return;
407   }
408 
409   std::vector<asio::const_buffer> buffers;
410   responseDone_ = reply->nextBuffers(buffers);
411 
412   unsigned s = 0;
413 #ifdef DEBUG
414   for (unsigned i = 0; i < buffers.size(); ++i) {
415     int size = asio::buffer_size(buffers[i]);
416     s += size;
417 #ifdef DEBUG_DUMP
418     char *data = (char *)asio::detail::buffer_cast_helper(buffers[i]);
419     for (int j = 0; j < size; ++j)
420       std::cerr << data[j];
421 #endif
422   }
423 #endif
424 
425   LOG_DEBUG(native() << " sending: " << s << "(buffers: "
426 	    << buffers.size() << ")");
427 
428   if (!buffers.empty()) {
429     startAsyncWriteResponse(reply, buffers, BODY_TIMEOUT);
430   } else {
431     cancelWriteTimer();
432     handleWriteResponse(reply);
433   }
434 }
435 
handleWriteResponse(ReplyPtr reply)436 void Connection::handleWriteResponse(ReplyPtr reply)
437 {
438   LOG_DEBUG(native() << ": handleWriteResponse() " <<
439 	    haveResponse_ << " " << responseDone_);
440   if (haveResponse_)
441     startWriteResponse(reply);
442   else {
443     if (!responseDone_) {
444       /*
445        * Keep reply open and wait for more data.
446        */
447     } else {
448       reply->logReply(request_handler_.logger());
449 
450       if (reply->closeConnection())
451 	ConnectionManager_.stop(shared_from_this());
452       else {
453 	request_parser_.reset();
454 	request_.reset();
455 	responseDone_ = false;
456 
457 	while (rcv_buffers_.size() > 1)
458 	  rcv_buffers_.pop_front();
459 
460 	if (rcv_remaining_ < rcv_buffers_.back().data() + rcv_buffer_size_)
461 	  handleReadRequest0();
462 	else
463 	  startAsyncReadRequest(rcv_buffers_.back(), KEEPALIVE_TIMEOUT);
464       }
465     }
466   }
467 }
468 
handleWriteResponse0(ReplyPtr reply,const Wt::AsioWrapper::error_code & e,std::size_t bytes_transferred)469 void Connection::handleWriteResponse0(ReplyPtr reply,
470                                       const Wt::AsioWrapper::error_code& e,
471 				      std::size_t bytes_transferred)
472 {
473   LOG_DEBUG(native() << ": handleWriteResponse0(): "
474 	    << bytes_transferred << " ; " << e.message());
475 
476   cancelWriteTimer();
477 
478   haveResponse_ = false;
479   waitingResponse_ = true;
480   reply->writeDone(!e);
481   waitingResponse_ = false;
482 
483   if (!e) {
484     handleWriteResponse(reply);
485   } else {
486     if (e != asio::error::operation_aborted)
487       handleError(e);
488   }
489 }
490 
491 } // namespace server
492 } // namespace http
493