1 #ifndef HTTPREQUEST_HPP
2 #define HTTPREQUEST_HPP
3
4 #include <map>
5 #include <iostream>
6
7 #include <functional>
8 #include <memory>
9 #include "libuv/include/uv.h"
10 #include "http-parser/http_parser.h"
11 #include "socket.h"
12 #include "webapplication.h"
13 #include "callbackqueue.h"
14 #include "httpresponse.h"
15 #include "utils.h"
16 #include "thread.h"
17 #include "auto_deleter.h"
18
19 enum Protocol {
20 HTTP,
21 WebSockets
22 };
23
24 // HttpRequest is a bit of a misnomer -- a HttpRequest object represents a
25 // single connection, on which multiple actual HTTP requests can be made.
26 class HttpRequest : public WebSocketConnectionCallbacks,
27 public std::enable_shared_from_this<HttpRequest>
28 {
29 private:
30 uv_loop_t* _pLoop;
31 std::shared_ptr<WebApplication> _pWebApplication;
32 VariantHandle _handle;
33 std::shared_ptr<Socket> _pSocket;
34 http_parser _parser;
35 Protocol _protocol;
36 std::string _url;
37 RequestHeaders _headers;
38 std::string _lastHeaderField;
39 std::shared_ptr<WebSocketConnection> _pWebSocketConnection;
40
41 // `_env` is an shared_ptr<Environment> instead of an Environment because it
42 // must be created and deleted on the main thread. However, the creation and
43 // deletion of HttpRequest objects happens on the background thread, and so
44 // the lifetime of the Environment can't be strictly tied to the lifetime of
45 // the HttpRequest. It is instantiated with a deleter function that ensures
46 // deletion happens on the main thread.
47 std::shared_ptr<Rcpp::Environment> _env;
48 void _newRequest();
49 void _initializeEnv();
50
51 // _ignoreNewData is used in cases where we rejected a request (by sending
52 // a response with a non-100 status code) before its body was received. We
53 // don't want to close the connection because the response might not be
54 // sent yet, but we don't want to parse any more data from this connection.
55 // (You would think uv_stop_read could be called, but it seems to prevent
56 // the response from being written as well.)
57 bool _ignoreNewData;
58
59 bool _is_closing;
60
61 // This starts false, and in the case of a connection upgrade, gets set to
62 // true after the headers are complete.
63 bool _is_upgrade;
64
65 void _parse_http_data(char* buf, const ssize_t n);
66 // Parse data that has been stored in the buffer.
67 void _parse_http_data_from_buffer();
68
69 bool _response_scheduled;
70 // True when the HttpRequest object is handling an HTTP request; gets set to
71 // false when the response is written.
72 bool _handling_request;
73
74 // For buffering the incoming HTTP request when data comes in while waiting
75 // for R to process headers.
76 std::vector<char> _requestBuffer;
77
78 // Most of the methods in HttpRequest run on a background thread. Some
79 // methods run on the main thread. This is used by the main-thread methods
80 // to schedule callbacks to run on the background thread.
81 CallbackQueue* _background_queue;
82
83 // Used to keep track of state when parsing headers. This is needed because
84 // sometimes the header fields and values can be split across multiple TCP
85 // messages, resulting in multiple calls to _on_header_field or
86 // _on_header_value.
87 enum LastHeaderState
88 {
89 START,
90 FIELD,
91 VALUE
92 };
93 LastHeaderState _last_header_state;
94
95 public:
HttpRequest(uv_loop_t * pLoop,std::shared_ptr<WebApplication> pWebApplication,std::shared_ptr<Socket> pSocket,CallbackQueue * backgroundQueue)96 HttpRequest(uv_loop_t* pLoop,
97 std::shared_ptr<WebApplication> pWebApplication,
98 std::shared_ptr<Socket> pSocket,
99 CallbackQueue* backgroundQueue)
100 : _pLoop(pLoop),
101 _pWebApplication(pWebApplication),
102 _pSocket(pSocket),
103 _protocol(HTTP),
104 _ignoreNewData(false),
105 _is_closing(false),
106 _is_upgrade(false),
107 _response_scheduled(false),
108 _handling_request(false),
109 _background_queue(backgroundQueue)
110 {
111 ASSERT_BACKGROUND_THREAD()
112 uv_tcp_init(pLoop, &_handle.tcp);
113 _handle.isTcp = true;
114 // This is used by the macro-defined callbacks like _on_request_read
115 _handle.stream.data = this;
116
117 http_parser_init(&_parser, HTTP_REQUEST);
118 // This is used by the macro-defined callbacks like _on_message_begin
119 _parser.data = this;
120
121 _last_header_state = START;
122 }
123
~HttpRequest()124 virtual ~HttpRequest() {
125 ASSERT_BACKGROUND_THREAD()
126 debug_log("HttpRequest::~HttpRequest", LOG_DEBUG);
127 _pWebSocketConnection.reset();
128 }
129
130 uv_stream_t* handle();
websocket()131 std::shared_ptr<WebSocketConnection> websocket() const {
132 return _pWebSocketConnection;
133 }
134 Address clientAddress();
135 Address serverAddress();
136 Rcpp::Environment& env();
137
138 void handleRequest();
139
140 std::string method() const;
141 std::string url() const;
142 const RequestHeaders& headers() const;
143
144 bool hasHeader(const std::string& name) const;
145 bool hasHeader(const std::string& name, const std::string& value, bool ci = false) const;
146 std::string getHeader(const std::string& name) const;
147
148 // Is the request an Upgrade (i.e. WebSocket connection)?
149 bool isUpgrade() const;
150
151 void sendWSFrame(const char* pHeader, size_t headerSize,
152 const char* pData, size_t dataSize,
153 const char* pFooter, size_t footerSize);
154 void closeWSSocket();
155
156 // Call this function from the main thread to indicate that a response has
157 // been scheduled. This is needed because sometimes by the time the main
158 // thread knows that it needs to send a response, the bg thread will have
159 // kept going and scheduled another call into the main thread to send a
160 // response.
161 void responseScheduled();
162 bool isResponseScheduled();
163
164 // This function should be called when a single request has been completed
165 // (when the response has been sent). It is currently used to detect
166 // pipelined HTTP requests.
167 void requestCompleted();
168
169 void _call_r_on_ws_open();
170 void _schedule_on_headers_complete_complete(std::shared_ptr<HttpResponse> pResponse);
171 void _on_headers_complete_complete(std::shared_ptr<HttpResponse> pResponse);
172 void _schedule_on_body_error(std::shared_ptr<HttpResponse> pResponse);
173 void _on_body_error(std::shared_ptr<HttpResponse> pResponse);
174 void _schedule_on_message_complete_complete(std::shared_ptr<HttpResponse> pResponse);
175 void _on_message_complete_complete(std::shared_ptr<HttpResponse> pResponse);
176
177 public:
178 // Callbacks
179 virtual int _on_message_begin(http_parser* pParser);
180 virtual int _on_url(http_parser* pParser, const char* pAt, size_t length);
181 virtual int _on_status(http_parser* pParser, const char* pAt, size_t length);
182 virtual int _on_header_field(http_parser* pParser, const char* pAt, size_t length);
183 virtual int _on_header_value(http_parser* pParser, const char* pAt, size_t length);
184 virtual int _on_headers_complete(http_parser* pParser);
185 virtual int _on_body(http_parser* pParser, const char* pAt, size_t length);
186 virtual int _on_message_complete(http_parser* pParser);
187
188 virtual void onWSMessage(bool binary, const char* data, size_t len);
189 virtual void onWSClose(int code);
190
191 // Update whether or not this HttpRequest is to be upgraded. This is called
192 // from _on_headers_complete().
193 void updateUpgradeStatus();
194
195 void _on_closed(uv_handle_t* handle);
196 void close();
197 void schedule_close();
198 void _on_request_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf);
199 void _on_response_write(int status);
200
_initializeSocket()201 void _initializeSocket() {
202 // Coerce to parent class
203 std::shared_ptr<WebSocketConnectionCallbacks> this_base(
204 std::static_pointer_cast<WebSocketConnectionCallbacks>(shared_from_this())
205 );
206
207 _pWebSocketConnection = std::shared_ptr<WebSocketConnection>(
208 new WebSocketConnection(this_base),
209 auto_deleter_background<WebSocketConnection>
210 );
211
212 _pSocket->addConnection(shared_from_this());
213 }
214 };
215
216
217 // Same for Websocketconnection
218 // Factory function needed because we can't call shared_from_this() inside the
219 // constructor.
createHttpRequest(uv_loop_t * pLoop,std::shared_ptr<WebApplication> pWebApplication,std::shared_ptr<Socket> pSocket,CallbackQueue * backgroundQueue)220 inline std::shared_ptr<HttpRequest> createHttpRequest(
221 uv_loop_t* pLoop,
222 std::shared_ptr<WebApplication> pWebApplication,
223 std::shared_ptr<Socket> pSocket,
224 CallbackQueue* backgroundQueue)
225 {
226 ASSERT_BACKGROUND_THREAD()
227
228 // The shared_ptr has a custom deleter which ensures that the HttpRequest is
229 // deleted on the background thread.
230 std::shared_ptr<HttpRequest> req(
231 new HttpRequest(pLoop, pWebApplication, pSocket, backgroundQueue),
232 auto_deleter_background<HttpRequest>
233 );
234
235 req->_initializeSocket();
236
237 return req;
238 }
239
240
241 #define DECLARE_CALLBACK_1(type, function_name, return_type, type_1) \
242 return_type type##_##function_name(type_1 arg1);
243 #define DECLARE_CALLBACK_3(type, function_name, return_type, type_1, type_2, type_3) \
244 return_type type##_##function_name(type_1 arg1, type_2 arg2, type_3 arg3);
245 #define DECLARE_CALLBACK_2(type, function_name, return_type, type_1, type_2) \
246 return_type type##_##function_name(type_1 arg1, type_2 arg2);
247
248 DECLARE_CALLBACK_1(HttpRequest, on_message_begin, int, http_parser*)
249 DECLARE_CALLBACK_3(HttpRequest, on_url, int, http_parser*, const char*, size_t)
250 DECLARE_CALLBACK_3(HttpRequest, on_status, int, http_parser*, const char*, size_t)
251 DECLARE_CALLBACK_3(HttpRequest, on_header_field, int, http_parser*, const char*, size_t)
252 DECLARE_CALLBACK_3(HttpRequest, on_header_value, int, http_parser*, const char*, size_t)
253 DECLARE_CALLBACK_1(HttpRequest, on_headers_complete, int, http_parser*)
254 DECLARE_CALLBACK_3(HttpRequest, on_body, int, http_parser*, const char*, size_t)
255 DECLARE_CALLBACK_1(HttpRequest, on_message_complete, int, http_parser*)
256 DECLARE_CALLBACK_1(HttpRequest, on_closed, void, uv_handle_t*)
257 DECLARE_CALLBACK_3(HttpRequest, on_request_read, void, uv_stream_t*, ssize_t, const uv_buf_t*)
258 DECLARE_CALLBACK_2(HttpRequest, on_response_write, void, uv_write_t*, int)
259
260
261 #endif // HTTPREQUEST_HPP
262