1 /*
2 * SessionRpc.cpp
3 *
4 * Copyright (C) 2021 by RStudio, PBC
5 *
6 * Unless you have received this program directly from RStudio pursuant
7 * to the terms of a commercial license agreement with RStudio, then
8 * this program is licensed to you under the terms of version 3 of the
9 * GNU Affero General Public License. This program is distributed WITHOUT
10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13 *
14 */
15
16 #include <string>
17
18 #include "SessionRpc.hpp"
19 #include "SessionHttpMethods.hpp"
20 #include "SessionClientEventQueue.hpp"
21 #include "SessionAsyncRpcConnection.hpp"
22
23 #include <shared_core/json/Json.hpp>
24 #include <core/json/JsonRpc.hpp>
25 #include <core/Exec.hpp>
26 #include <core/Log.hpp>
27
28 #include <r/RExec.hpp>
29 #include <r/RSexp.hpp>
30 #include <r/RJson.hpp>
31 #include <r/RJsonRpc.hpp>
32 #include <r/RRoutines.hpp>
33
34 using namespace rstudio::core;
35
36 namespace rstudio {
37 namespace session {
38 namespace {
39
40 // a delay used when processing RPC methods (used to simulate network latency)
41 int s_rpcDelayMs = -1;
42
43 std::set<std::string> s_offlineableUris;
44
45 // json rpc methods
46 core::json::JsonRpcAsyncMethods* s_pJsonRpcMethods = nullptr;
47
endHandleRpcRequestDirect(boost::shared_ptr<HttpConnection> ptrConnection,boost::posix_time::ptime executeStartTime,const core::Error & executeError,json::JsonRpcResponse * pJsonRpcResponse)48 void endHandleRpcRequestDirect(boost::shared_ptr<HttpConnection> ptrConnection,
49 boost::posix_time::ptime executeStartTime,
50 const core::Error& executeError,
51 json::JsonRpcResponse* pJsonRpcResponse)
52 {
53 // return error or result then continue waiting for requests
54 if (executeError)
55 {
56 ptrConnection->sendJsonRpcError(executeError);
57 }
58 else
59 {
60 // allow modules to detect changes after rpc calls
61 if (!pJsonRpcResponse->suppressDetectChanges())
62 {
63 module_context::events().onDetectChanges(
64 module_context::ChangeSourceRPC);
65 }
66
67 // are there (or will there likely be) events pending?
68 // (if not then notify the client)
69 if ( !clientEventQueue().eventAddedSince(executeStartTime) &&
70 !pJsonRpcResponse->hasAfterResponse() )
71 {
72 pJsonRpcResponse->setField(kEventsPending, "false");
73 }
74
75 // send the response
76 ptrConnection->sendJsonRpcResponse(*pJsonRpcResponse);
77
78 // run after response if we have one (then detect changes again)
79 if (pJsonRpcResponse->hasAfterResponse())
80 {
81 pJsonRpcResponse->runAfterResponse();
82 if (!pJsonRpcResponse->suppressDetectChanges())
83 {
84 module_context::events().onDetectChanges(
85 module_context::ChangeSourceRPC);
86 }
87 }
88 }
89 }
90
91
saveJsonResponse(const core::Error & error,core::json::JsonRpcResponse * pSrc,core::Error * pError,core::json::JsonRpcResponse * pDest)92 void saveJsonResponse(const core::Error& error, core::json::JsonRpcResponse *pSrc,
93 core::Error *pError, core::json::JsonRpcResponse *pDest)
94 {
95 *pError = error;
96 *pDest = *pSrc;
97 }
98
99 // invoke an HTTP RPC directly from R.
rs_invokeRpc(SEXP name,SEXP args)100 SEXP rs_invokeRpc(SEXP name, SEXP args)
101 {
102 // generate RPC request from this R command
103 json::JsonRpcRequest request;
104 rpc::formatRpcRequest(name, args, &request);
105
106 // check to see if the RPC exists
107 auto it = s_pJsonRpcMethods->find(request.method);
108 if (it == s_pJsonRpcMethods->end())
109 {
110 // specified method doesn't exist
111 r::exec::error("Requested RPC method " + request.method + " does not exist.");
112 return R_NilValue;
113 }
114
115 std::pair<bool, json::JsonRpcAsyncFunction> reg = it->second;
116 json::JsonRpcAsyncFunction handlerFunction = reg.second;
117
118 if (!reg.first)
119 {
120 // this indicates an async RPC, which isn't currently handled
121 r::exec::error("Requested RPC method " + request.method + " is asynchronous.");
122 return R_NilValue;
123 }
124
125 // invoke handler and record response
126 core::json::JsonRpcResponse response;
127 core::Error rpcError = Success();
128 handlerFunction(request,
129 boost::bind(saveJsonResponse, _1, _2, &rpcError, &response));
130
131 // raise an R error if the RPC fails
132 if (rpcError)
133 {
134 r::exec::error(log::writeError(rpcError));
135 }
136
137 // emit formatted response if enabled
138 if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
139 {
140 std::cout << "<<<" << std::endl;
141 response.getRawResponse().writeFormatted(std::cout);
142 std::cout << std::endl;
143 }
144
145 // convert JSON response back to R
146 SEXP result = R_NilValue;
147 r::sexp::Protect protect;
148 result = r::sexp::create(response.result(), &protect);
149
150 // raise an R error if the RPC returns an error
151 rpc::raiseJsonRpcResponseError(response);
152
153 return result;
154 }
155
156 } // anonymous namespace
157
158
159 namespace module_context {
160
registerAsyncRpcMethod(const std::string & name,const core::json::JsonRpcAsyncFunction & function)161 Error registerAsyncRpcMethod(const std::string& name,
162 const core::json::JsonRpcAsyncFunction& function)
163 {
164 s_pJsonRpcMethods->insert(
165 std::make_pair(name, std::make_pair(false, function)));
166 return Success();
167 }
168
registerRpcMethod(const std::string & name,const core::json::JsonRpcFunction & function)169 Error registerRpcMethod(const std::string& name,
170 const core::json::JsonRpcFunction& function)
171 {
172 s_pJsonRpcMethods->insert(
173 std::make_pair(name,
174 std::make_pair(true, json::adaptToAsync(function))));
175 return Success();
176 }
177
registerRpcMethod(const core::json::JsonRpcAsyncMethod & method)178 void registerRpcMethod(const core::json::JsonRpcAsyncMethod& method)
179 {
180 s_pJsonRpcMethods->insert(method);
181 }
182
183 } // namespace module_context
184
185 namespace rpc {
186
formatRpcRequest(SEXP name,SEXP args,core::json::JsonRpcRequest * pRequest)187 void formatRpcRequest(SEXP name,
188 SEXP args,
189 core::json::JsonRpcRequest* pRequest)
190 {
191 // find name of RPC to invoke
192 std::string method = r::sexp::safeAsString(name, "");
193
194 // assemble a request
195 pRequest->method = method;
196
197 // form argument list; convert from R to JSON
198 core::json::Value rpcArgs;
199 Error error = r::json::jsonValueFromObject(args, &rpcArgs);
200 if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
201 std::cout << ">>>" << std::endl;
202 if (rpcArgs.getType() == json::Type::OBJECT)
203 {
204 // named pair parameters
205 pRequest->kwparams = rpcArgs.getValue<json::Object>();
206 if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
207 pRequest->kwparams.writeFormatted(std::cout);
208 }
209 else if (rpcArgs.getType() == json::Type::ARRAY)
210 {
211 // array parameters
212 pRequest->params = rpcArgs.getValue<json::Array>();
213 if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
214 pRequest->params.writeFormatted(std::cout);
215 }
216 if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
217 std::cout << std::endl;
218 }
219
raiseJsonRpcResponseError(json::JsonRpcResponse & response)220 void raiseJsonRpcResponseError(json::JsonRpcResponse& response)
221 {
222 // raise an R error if the RPC returns an error
223 if (response.error().getType() == json::Type::OBJECT)
224 {
225 // formulate verbose error string
226 json::Object err = response.error().getObject();
227 std::string message = err["message"].getString();
228 if (err.find("error") != err.end())
229 message += ", Error " + err["error"].getString();
230 if (err.find("category") != err.end())
231 message += ", Category " + err["category"].getString();
232 if (err.find("code") != err.end())
233 message += ", Code " + err["code"].getString();
234 if (err.find("location") != err.end())
235 message += " at " + err["location"].getString();
236
237 r::exec::error(message);
238 }
239 }
240
sendJsonAsyncPendingResponse(const core::json::JsonRpcRequest & request,boost::shared_ptr<HttpConnection> ptrConnection,std::string & asyncHandle)241 void sendJsonAsyncPendingResponse(const core::json::JsonRpcRequest &request,
242 boost::shared_ptr<HttpConnection> ptrConnection,
243 std::string &asyncHandle)
244 {
245 // indirect return (asyncHandle style)
246 json::JsonRpcResponse response;
247 response.setAsyncHandle(asyncHandle);
248 response.setField(kEventsPending, "false");
249 ptrConnection->sendJsonRpcResponse(response);
250 }
251
endHandleRpcRequestIndirect(const std::string & asyncHandle,const core::Error & executeError,json::JsonRpcResponse * pJsonRpcResponse)252 void endHandleRpcRequestIndirect(
253 const std::string& asyncHandle,
254 const core::Error& executeError,
255 json::JsonRpcResponse* pJsonRpcResponse)
256 {
257 json::JsonRpcResponse temp;
258 json::JsonRpcResponse& jsonRpcResponse =
259 pJsonRpcResponse ? *pJsonRpcResponse : temp;
260
261 if (executeError)
262 jsonRpcResponse.setError(executeError);
263
264 if (!jsonRpcResponse.hasField(kEventsPending))
265 jsonRpcResponse.setField(kEventsPending, "false");
266
267 json::Object value;
268 value["handle"] = asyncHandle;
269 value["response"] = jsonRpcResponse.getRawResponse();
270 ClientEvent evt(client_events::kAsyncCompletion, value);
271 module_context::enqueClientEvent(evt);
272
273 // run after response if we have one (then detect changes again)
274 if (pJsonRpcResponse->hasAfterResponse())
275 {
276 pJsonRpcResponse->runAfterResponse();
277 if (!pJsonRpcResponse->suppressDetectChanges())
278 {
279 module_context::events().onDetectChanges(
280 module_context::ChangeSourceRPC);
281 }
282 }
283 }
284
handleRpcRequest(const core::json::JsonRpcRequest & request,boost::shared_ptr<HttpConnection> ptrConnection,http_methods::ConnectionType connectionType)285 void handleRpcRequest(const core::json::JsonRpcRequest& request,
286 boost::shared_ptr<HttpConnection> ptrConnection,
287 http_methods::ConnectionType connectionType)
288 {
289 // delay handling this RPC if requested
290 if (s_rpcDelayMs > 0)
291 {
292 boost::this_thread::sleep_for(boost::chrono::milliseconds(s_rpcDelayMs));
293 }
294
295 // record the time just prior to execution of the event
296 // (so we can determine if any events were added during execution)
297 using namespace boost::posix_time;
298 ptime executeStartTime = microsec_clock::universal_time();
299
300 // execute the method
301 auto it = s_pJsonRpcMethods->find(request.method);
302 if (it != s_pJsonRpcMethods->end())
303 {
304 std::pair<bool, json::JsonRpcAsyncFunction> reg = it->second;
305 json::JsonRpcAsyncFunction handlerFunction = reg.second;
306
307 // For asyncRpc the http response was already sent - just call the handler and emit the event
308 if (ptrConnection->isAsyncRpc())
309 {
310 boost::shared_ptr<rpc::AsyncRpcConnection> asyncConn =
311 boost::static_pointer_cast<rpc::AsyncRpcConnection>(ptrConnection);
312 handlerFunction(request,
313 boost::bind(endHandleRpcRequestIndirect,
314 asyncConn->asyncHandle(),
315 _1,
316 _2));
317 }
318 // Sync rpc
319 else if (reg.first)
320 {
321 // direct return
322 handlerFunction(request,
323 boost::bind(endHandleRpcRequestDirect,
324 ptrConnection,
325 executeStartTime,
326 _1,
327 _2));
328 }
329 // registerAsyncRpc - http connection is still open, send the async response, then emit the event
330 else
331 {
332 std::string asyncHandle = core::system::generateUuid(true);
333 sendJsonAsyncPendingResponse(request, ptrConnection, asyncHandle);
334
335 handlerFunction(request,
336 boost::bind(endHandleRpcRequestIndirect,
337 asyncHandle,
338 _1,
339 _2));
340 }
341 }
342 else
343 {
344 Error executeError = Error(json::errc::MethodNotFound, ERROR_LOCATION);
345 executeError.addProperty("method", request.method);
346
347 // we need to know about these because they represent unexpected
348 // application states
349 LOG_ERROR(executeError);
350
351 if (ptrConnection->isAsyncRpc())
352 {
353 boost::shared_ptr<rpc::AsyncRpcConnection> asyncConn =
354 boost::static_pointer_cast<rpc::AsyncRpcConnection>(ptrConnection);
355 endHandleRpcRequestIndirect(asyncConn->asyncHandle(), executeError, nullptr);
356 }
357 else
358 endHandleRpcRequestDirect(ptrConnection, executeStartTime, executeError, nullptr);
359 }
360 }
361
setRpcDelay(int delayMs)362 void setRpcDelay(int delayMs)
363 {
364 s_rpcDelayMs = delayMs;
365 }
366
isOfflineableRequest(boost::shared_ptr<HttpConnection> ptrConnection)367 bool isOfflineableRequest(boost::shared_ptr<HttpConnection> ptrConnection)
368 {
369 // Only specific requests that do not use the R runtime are offlineable (e.g. save_document)
370 if (s_offlineableUris.find(ptrConnection->request().uri()) == s_offlineableUris.end())
371 return false;
372 return true;
373 }
374
initialize()375 Error initialize()
376 {
377 // intentionally allocate methods on the heap and let them leak
378 // (we had seen issues in the past where an abnormally terminated
379 // R process could leak the process stuck in the destructor of
380 // this map pegging the processor at 100%; avoid this by allowing
381 // the OS to clean up memory itself after the process is gone)
382 s_pJsonRpcMethods = new core::json::JsonRpcAsyncMethods;
383
384 RS_REGISTER_CALL_METHOD(rs_invokeRpc);
385
386 s_offlineableUris.insert("/rpc/save_document");
387 s_offlineableUris.insert("/rpc/save_document_diff");
388 s_offlineableUris.insert("/rpc/open_document");
389 s_offlineableUris.insert("/rpc/set_client_state");
390 s_offlineableUris.insert("/rpc/list_files");
391 s_offlineableUris.insert("/rpc/modify_document_properties");
392 s_offlineableUris.insert("/rpc/check_for_external_edit");
393 s_offlineableUris.insert("/rpc/start_terminal");
394 s_offlineableUris.insert("/rpc/process_start");
395 s_offlineableUris.insert("/rpc/process_notify_start");
396 s_offlineableUris.insert("/rpc/process_notify_visible");
397 s_offlineableUris.insert("/rpc/process_get_buffer_chunk");
398 s_offlineableUris.insert("/rpc/process_erase_buffer");
399 s_offlineableUris.insert("/rpc/process_set_size");
400 s_offlineableUris.insert("/rpc/get_source_template");
401 s_offlineableUris.insert("/rpc/new_document");
402
403 return Success();
404 }
405
406 } // namespace rpc
407 } // namespace session
408 } // namespace rstudio
409
410