1 #ifndef HTTPREQUEST_HPP
2 #define HTTPREQUEST_HPP
3 
4 #include <map>
5 #include <iostream>
6 
7 #include <functional>
8 #include <memory>
9 #include "libuv/include/uv.h"
10 #include "http-parser/http_parser.h"
11 #include "socket.h"
12 #include "webapplication.h"
13 #include "callbackqueue.h"
14 #include "httpresponse.h"
15 #include "utils.h"
16 #include "thread.h"
17 #include "auto_deleter.h"
18 
19 enum Protocol {
20   HTTP,
21   WebSockets
22 };
23 
24 // HttpRequest is a bit of a misnomer -- a HttpRequest object represents a
25 // single connection, on which multiple actual HTTP requests can be made.
26 class HttpRequest : public WebSocketConnectionCallbacks,
27                     public std::enable_shared_from_this<HttpRequest>
28 {
29 private:
30   uv_loop_t* _pLoop;
31   std::shared_ptr<WebApplication> _pWebApplication;
32   VariantHandle _handle;
33   std::shared_ptr<Socket> _pSocket;
34   http_parser _parser;
35   Protocol _protocol;
36   std::string _url;
37   RequestHeaders _headers;
38   std::string _lastHeaderField;
39   std::shared_ptr<WebSocketConnection> _pWebSocketConnection;
40 
41   // `_env` is an shared_ptr<Environment> instead of an Environment because it
42   // must be created and deleted on the main thread. However, the creation and
43   // deletion of HttpRequest objects happens on the background thread, and so
44   // the lifetime of the Environment can't be strictly tied to the lifetime of
45   // the HttpRequest. It is instantiated with a deleter function that ensures
46   // deletion happens on the main thread.
47   std::shared_ptr<Rcpp::Environment> _env;
48   void _newRequest();
49   void _initializeEnv();
50 
51   // _ignoreNewData is used in cases where we rejected a request (by sending
52   // a response with a non-100 status code) before its body was received. We
53   // don't want to close the connection because the response might not be
54   // sent yet, but we don't want to parse any more data from this connection.
55   // (You would think uv_stop_read could be called, but it seems to prevent
56   // the response from being written as well.)
57   bool _ignoreNewData;
58 
59   bool _is_closing;
60 
61   // This starts false, and in the case of a connection upgrade, gets set to
62   // true after the headers are complete.
63   bool _is_upgrade;
64 
65   void _parse_http_data(char* buf, const ssize_t n);
66   // Parse data that has been stored in the buffer.
67   void _parse_http_data_from_buffer();
68 
69   bool _response_scheduled;
70   // True when the HttpRequest object is handling an HTTP request; gets set to
71   // false when the response is written.
72   bool _handling_request;
73 
74   // For buffering the incoming HTTP request when data comes in while waiting
75   // for R to process headers.
76   std::vector<char> _requestBuffer;
77 
78   // Most of the methods in HttpRequest run on a background thread. Some
79   // methods run on the main thread. This is used by the main-thread methods
80   // to schedule callbacks to run on the background thread.
81   CallbackQueue* _background_queue;
82 
83   // Used to keep track of state when parsing headers. This is needed because
84   // sometimes the header fields and values can be split across multiple TCP
85   // messages, resulting in multiple calls to _on_header_field or
86   // _on_header_value.
87   enum LastHeaderState
88   {
89     START,
90     FIELD,
91     VALUE
92   };
93   LastHeaderState _last_header_state;
94 
95 public:
HttpRequest(uv_loop_t * pLoop,std::shared_ptr<WebApplication> pWebApplication,std::shared_ptr<Socket> pSocket,CallbackQueue * backgroundQueue)96   HttpRequest(uv_loop_t* pLoop,
97               std::shared_ptr<WebApplication> pWebApplication,
98               std::shared_ptr<Socket> pSocket,
99               CallbackQueue* backgroundQueue)
100     : _pLoop(pLoop),
101       _pWebApplication(pWebApplication),
102       _pSocket(pSocket),
103       _protocol(HTTP),
104       _ignoreNewData(false),
105       _is_closing(false),
106       _is_upgrade(false),
107       _response_scheduled(false),
108       _handling_request(false),
109       _background_queue(backgroundQueue)
110   {
111     ASSERT_BACKGROUND_THREAD()
112     uv_tcp_init(pLoop, &_handle.tcp);
113     _handle.isTcp = true;
114     // This is used by the macro-defined callbacks like _on_request_read
115     _handle.stream.data = this;
116 
117     http_parser_init(&_parser, HTTP_REQUEST);
118     // This is used by the macro-defined callbacks like _on_message_begin
119     _parser.data = this;
120 
121     _last_header_state = START;
122   }
123 
~HttpRequest()124   virtual ~HttpRequest() {
125     ASSERT_BACKGROUND_THREAD()
126     debug_log("HttpRequest::~HttpRequest", LOG_DEBUG);
127     _pWebSocketConnection.reset();
128   }
129 
130   uv_stream_t* handle();
websocket()131   std::shared_ptr<WebSocketConnection> websocket() const {
132     return _pWebSocketConnection;
133   }
134   Address clientAddress();
135   Address serverAddress();
136   Rcpp::Environment& env();
137 
138   void handleRequest();
139 
140   std::string method() const;
141   std::string url() const;
142   const RequestHeaders& headers() const;
143 
144   bool hasHeader(const std::string& name) const;
145   bool hasHeader(const std::string& name, const std::string& value, bool ci = false) const;
146   std::string getHeader(const std::string& name) const;
147 
148   // Is the request an Upgrade (i.e. WebSocket connection)?
149   bool isUpgrade() const;
150 
151   void sendWSFrame(const char* pHeader, size_t headerSize,
152                    const char* pData, size_t dataSize,
153                    const char* pFooter, size_t footerSize);
154   void closeWSSocket();
155 
156   // Call this function from the main thread to indicate that a response has
157   // been scheduled. This is needed because sometimes by the time the main
158   // thread knows that it needs to send a response, the bg thread will have
159   // kept going and scheduled another call into the main thread to send a
160   // response.
161   void responseScheduled();
162   bool isResponseScheduled();
163 
164   // This function should be called when a single request has been completed
165   // (when the response has been sent). It is currently used to detect
166   // pipelined HTTP requests.
167   void requestCompleted();
168 
169   void _call_r_on_ws_open();
170   void _schedule_on_headers_complete_complete(std::shared_ptr<HttpResponse> pResponse);
171   void _on_headers_complete_complete(std::shared_ptr<HttpResponse> pResponse);
172   void _schedule_on_body_error(std::shared_ptr<HttpResponse> pResponse);
173   void _on_body_error(std::shared_ptr<HttpResponse> pResponse);
174   void _schedule_on_message_complete_complete(std::shared_ptr<HttpResponse> pResponse);
175   void _on_message_complete_complete(std::shared_ptr<HttpResponse> pResponse);
176 
177 public:
178   // Callbacks
179   virtual int _on_message_begin(http_parser* pParser);
180   virtual int _on_url(http_parser* pParser, const char* pAt, size_t length);
181   virtual int _on_status(http_parser* pParser, const char* pAt, size_t length);
182   virtual int _on_header_field(http_parser* pParser, const char* pAt, size_t length);
183   virtual int _on_header_value(http_parser* pParser, const char* pAt, size_t length);
184   virtual int _on_headers_complete(http_parser* pParser);
185   virtual int _on_body(http_parser* pParser, const char* pAt, size_t length);
186   virtual int _on_message_complete(http_parser* pParser);
187 
188   virtual void onWSMessage(bool binary, const char* data, size_t len);
189   virtual void onWSClose(int code);
190 
191   // Update whether or not this HttpRequest is to be upgraded. This is called
192   // from _on_headers_complete().
193   void updateUpgradeStatus();
194 
195   void _on_closed(uv_handle_t* handle);
196   void close();
197   void schedule_close();
198   void _on_request_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf);
199   void _on_response_write(int status);
200 
_initializeSocket()201   void _initializeSocket() {
202     // Coerce to parent class
203     std::shared_ptr<WebSocketConnectionCallbacks> this_base(
204       std::static_pointer_cast<WebSocketConnectionCallbacks>(shared_from_this())
205     );
206 
207     _pWebSocketConnection = std::shared_ptr<WebSocketConnection>(
208       new WebSocketConnection(this_base),
209       auto_deleter_background<WebSocketConnection>
210     );
211 
212     _pSocket->addConnection(shared_from_this());
213   }
214 };
215 
216 
217 // Same for Websocketconnection
218 // Factory function needed because we can't call shared_from_this() inside the
219 // constructor.
createHttpRequest(uv_loop_t * pLoop,std::shared_ptr<WebApplication> pWebApplication,std::shared_ptr<Socket> pSocket,CallbackQueue * backgroundQueue)220 inline std::shared_ptr<HttpRequest> createHttpRequest(
221   uv_loop_t* pLoop,
222   std::shared_ptr<WebApplication> pWebApplication,
223   std::shared_ptr<Socket> pSocket,
224   CallbackQueue* backgroundQueue)
225 {
226   ASSERT_BACKGROUND_THREAD()
227 
228   // The shared_ptr has a custom deleter which ensures that the HttpRequest is
229   // deleted on the background thread.
230   std::shared_ptr<HttpRequest> req(
231     new HttpRequest(pLoop, pWebApplication, pSocket, backgroundQueue),
232     auto_deleter_background<HttpRequest>
233   );
234 
235   req->_initializeSocket();
236 
237   return req;
238 }
239 
240 
241 #define DECLARE_CALLBACK_1(type, function_name, return_type, type_1) \
242   return_type type##_##function_name(type_1 arg1);
243 #define DECLARE_CALLBACK_3(type, function_name, return_type, type_1, type_2, type_3) \
244   return_type type##_##function_name(type_1 arg1, type_2 arg2, type_3 arg3);
245 #define DECLARE_CALLBACK_2(type, function_name, return_type, type_1, type_2) \
246   return_type type##_##function_name(type_1 arg1, type_2 arg2);
247 
248 DECLARE_CALLBACK_1(HttpRequest, on_message_begin, int, http_parser*)
249 DECLARE_CALLBACK_3(HttpRequest, on_url, int, http_parser*, const char*, size_t)
250 DECLARE_CALLBACK_3(HttpRequest, on_status, int, http_parser*, const char*, size_t)
251 DECLARE_CALLBACK_3(HttpRequest, on_header_field, int, http_parser*, const char*, size_t)
252 DECLARE_CALLBACK_3(HttpRequest, on_header_value, int, http_parser*, const char*, size_t)
253 DECLARE_CALLBACK_1(HttpRequest, on_headers_complete, int, http_parser*)
254 DECLARE_CALLBACK_3(HttpRequest, on_body, int, http_parser*, const char*, size_t)
255 DECLARE_CALLBACK_1(HttpRequest, on_message_complete, int, http_parser*)
256 DECLARE_CALLBACK_1(HttpRequest, on_closed, void, uv_handle_t*)
257 DECLARE_CALLBACK_3(HttpRequest, on_request_read, void, uv_stream_t*, ssize_t, const uv_buf_t*)
258 DECLARE_CALLBACK_2(HttpRequest, on_response_write, void, uv_write_t*, int)
259 
260 
261 #endif // HTTPREQUEST_HPP
262