1 /*
2  * SessionRpc.cpp
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 
16 #include <string>
17 
18 #include "SessionRpc.hpp"
19 #include "SessionHttpMethods.hpp"
20 #include "SessionClientEventQueue.hpp"
21 #include "SessionAsyncRpcConnection.hpp"
22 
23 #include <shared_core/json/Json.hpp>
24 #include <core/json/JsonRpc.hpp>
25 #include <core/Exec.hpp>
26 #include <core/Log.hpp>
27 
28 #include <r/RExec.hpp>
29 #include <r/RSexp.hpp>
30 #include <r/RJson.hpp>
31 #include <r/RJsonRpc.hpp>
32 #include <r/RRoutines.hpp>
33 
34 using namespace rstudio::core;
35 
36 namespace rstudio {
37 namespace session {
38 namespace {
39 
40 // a delay used when processing RPC methods (used to simulate network latency)
41 int s_rpcDelayMs = -1;
42 
43 std::set<std::string> s_offlineableUris;
44 
45 // json rpc methods
46 core::json::JsonRpcAsyncMethods* s_pJsonRpcMethods = nullptr;
47 
endHandleRpcRequestDirect(boost::shared_ptr<HttpConnection> ptrConnection,boost::posix_time::ptime executeStartTime,const core::Error & executeError,json::JsonRpcResponse * pJsonRpcResponse)48 void endHandleRpcRequestDirect(boost::shared_ptr<HttpConnection> ptrConnection,
49                          boost::posix_time::ptime executeStartTime,
50                          const core::Error& executeError,
51                          json::JsonRpcResponse* pJsonRpcResponse)
52 {
53    // return error or result then continue waiting for requests
54    if (executeError)
55    {
56       ptrConnection->sendJsonRpcError(executeError);
57    }
58    else
59    {
60       // allow modules to detect changes after rpc calls
61       if (!pJsonRpcResponse->suppressDetectChanges())
62       {
63          module_context::events().onDetectChanges(
64                module_context::ChangeSourceRPC);
65       }
66 
67       // are there (or will there likely be) events pending?
68       // (if not then notify the client)
69       if ( !clientEventQueue().eventAddedSince(executeStartTime) &&
70            !pJsonRpcResponse->hasAfterResponse() )
71       {
72          pJsonRpcResponse->setField(kEventsPending, "false");
73       }
74 
75       // send the response
76       ptrConnection->sendJsonRpcResponse(*pJsonRpcResponse);
77 
78       // run after response if we have one (then detect changes again)
79       if (pJsonRpcResponse->hasAfterResponse())
80       {
81          pJsonRpcResponse->runAfterResponse();
82          if (!pJsonRpcResponse->suppressDetectChanges())
83          {
84             module_context::events().onDetectChanges(
85                   module_context::ChangeSourceRPC);
86          }
87       }
88    }
89 }
90 
91 
saveJsonResponse(const core::Error & error,core::json::JsonRpcResponse * pSrc,core::Error * pError,core::json::JsonRpcResponse * pDest)92 void saveJsonResponse(const core::Error& error, core::json::JsonRpcResponse *pSrc,
93                       core::Error *pError,      core::json::JsonRpcResponse *pDest)
94 {
95    *pError = error;
96    *pDest = *pSrc;
97 }
98 
99 // invoke an HTTP RPC directly from R.
rs_invokeRpc(SEXP name,SEXP args)100 SEXP rs_invokeRpc(SEXP name, SEXP args)
101 {
102    // generate RPC request from this R command
103    json::JsonRpcRequest request;
104    rpc::formatRpcRequest(name, args, &request);
105 
106    // check to see if the RPC exists
107    auto it = s_pJsonRpcMethods->find(request.method);
108    if (it == s_pJsonRpcMethods->end())
109    {
110       // specified method doesn't exist
111       r::exec::error("Requested RPC method " + request.method + " does not exist.");
112       return R_NilValue;
113    }
114 
115    std::pair<bool, json::JsonRpcAsyncFunction> reg = it->second;
116    json::JsonRpcAsyncFunction handlerFunction = reg.second;
117 
118    if (!reg.first)
119    {
120       // this indicates an async RPC, which isn't currently handled
121       r::exec::error("Requested RPC method " + request.method + " is asynchronous.");
122       return R_NilValue;
123    }
124 
125    // invoke handler and record response
126    core::json::JsonRpcResponse response;
127    core::Error rpcError = Success();
128    handlerFunction(request,
129                    boost::bind(saveJsonResponse, _1, _2, &rpcError, &response));
130 
131    // raise an R error if the RPC fails
132    if (rpcError)
133    {
134       r::exec::error(log::writeError(rpcError));
135    }
136 
137    // emit formatted response if enabled
138    if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
139    {
140       std::cout << "<<<" << std::endl;
141       response.getRawResponse().writeFormatted(std::cout);
142       std::cout << std::endl;
143    }
144 
145    // convert JSON response back to R
146    SEXP result = R_NilValue;
147    r::sexp::Protect protect;
148    result = r::sexp::create(response.result(), &protect);
149 
150    // raise an R error if the RPC returns an error
151    rpc::raiseJsonRpcResponseError(response);
152 
153    return result;
154 }
155 
156 } // anonymous namespace
157 
158 
159 namespace module_context {
160 
registerAsyncRpcMethod(const std::string & name,const core::json::JsonRpcAsyncFunction & function)161 Error registerAsyncRpcMethod(const std::string& name,
162                              const core::json::JsonRpcAsyncFunction& function)
163 {
164    s_pJsonRpcMethods->insert(
165          std::make_pair(name, std::make_pair(false, function)));
166    return Success();
167 }
168 
registerRpcMethod(const std::string & name,const core::json::JsonRpcFunction & function)169 Error registerRpcMethod(const std::string& name,
170                         const core::json::JsonRpcFunction& function)
171 {
172    s_pJsonRpcMethods->insert(
173          std::make_pair(name,
174                         std::make_pair(true, json::adaptToAsync(function))));
175    return Success();
176 }
177 
registerRpcMethod(const core::json::JsonRpcAsyncMethod & method)178 void registerRpcMethod(const core::json::JsonRpcAsyncMethod& method)
179 {
180    s_pJsonRpcMethods->insert(method);
181 }
182 
183 } // namespace module_context
184 
185 namespace rpc {
186 
formatRpcRequest(SEXP name,SEXP args,core::json::JsonRpcRequest * pRequest)187 void formatRpcRequest(SEXP name,
188                       SEXP args,
189                       core::json::JsonRpcRequest* pRequest)
190 {
191    // find name of RPC to invoke
192    std::string method = r::sexp::safeAsString(name, "");
193 
194    // assemble a request
195    pRequest->method = method;
196 
197    // form argument list; convert from R to JSON
198    core::json::Value rpcArgs;
199    Error error = r::json::jsonValueFromObject(args, &rpcArgs);
200    if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
201       std::cout << ">>>" << std::endl;
202    if (rpcArgs.getType() == json::Type::OBJECT)
203    {
204       // named pair parameters
205       pRequest->kwparams = rpcArgs.getValue<json::Object>();
206       if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
207          pRequest->kwparams.writeFormatted(std::cout);
208    }
209    else if (rpcArgs.getType() == json::Type::ARRAY)
210    {
211       // array parameters
212       pRequest->params = rpcArgs.getValue<json::Array>();
213       if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
214          pRequest->params.writeFormatted(std::cout);
215    }
216    if (!core::system::getenv("RSTUDIO_SESSION_RPC_DEBUG").empty())
217       std::cout << std::endl;
218 }
219 
raiseJsonRpcResponseError(json::JsonRpcResponse & response)220 void raiseJsonRpcResponseError(json::JsonRpcResponse& response)
221 {
222    // raise an R error if the RPC returns an error
223    if (response.error().getType() == json::Type::OBJECT)
224    {
225       // formulate verbose error string
226       json::Object err = response.error().getObject();
227       std::string message = err["message"].getString();
228       if (err.find("error") != err.end())
229          message += ", Error " + err["error"].getString();
230       if (err.find("category") != err.end())
231          message += ", Category " + err["category"].getString();
232       if (err.find("code") != err.end())
233          message += ", Code " + err["code"].getString();
234       if (err.find("location") != err.end())
235          message += " at " + err["location"].getString();
236 
237       r::exec::error(message);
238    }
239 }
240 
sendJsonAsyncPendingResponse(const core::json::JsonRpcRequest & request,boost::shared_ptr<HttpConnection> ptrConnection,std::string & asyncHandle)241 void sendJsonAsyncPendingResponse(const core::json::JsonRpcRequest &request,
242                                   boost::shared_ptr<HttpConnection> ptrConnection,
243                                   std::string &asyncHandle)
244 {
245    // indirect return (asyncHandle style)
246    json::JsonRpcResponse response;
247    response.setAsyncHandle(asyncHandle);
248    response.setField(kEventsPending, "false");
249    ptrConnection->sendJsonRpcResponse(response);
250 }
251 
endHandleRpcRequestIndirect(const std::string & asyncHandle,const core::Error & executeError,json::JsonRpcResponse * pJsonRpcResponse)252 void endHandleRpcRequestIndirect(
253         const std::string& asyncHandle,
254         const core::Error& executeError,
255         json::JsonRpcResponse* pJsonRpcResponse)
256 {
257    json::JsonRpcResponse temp;
258    json::JsonRpcResponse& jsonRpcResponse =
259            pJsonRpcResponse ? *pJsonRpcResponse : temp;
260 
261    if (executeError)
262       jsonRpcResponse.setError(executeError);
263 
264    if (!jsonRpcResponse.hasField(kEventsPending))
265       jsonRpcResponse.setField(kEventsPending, "false");
266 
267    json::Object value;
268    value["handle"] = asyncHandle;
269    value["response"] = jsonRpcResponse.getRawResponse();
270    ClientEvent evt(client_events::kAsyncCompletion, value);
271    module_context::enqueClientEvent(evt);
272 
273    // run after response if we have one (then detect changes again)
274    if (pJsonRpcResponse->hasAfterResponse())
275    {
276       pJsonRpcResponse->runAfterResponse();
277       if (!pJsonRpcResponse->suppressDetectChanges())
278       {
279          module_context::events().onDetectChanges(
280                  module_context::ChangeSourceRPC);
281       }
282    }
283 }
284 
handleRpcRequest(const core::json::JsonRpcRequest & request,boost::shared_ptr<HttpConnection> ptrConnection,http_methods::ConnectionType connectionType)285 void handleRpcRequest(const core::json::JsonRpcRequest& request,
286                       boost::shared_ptr<HttpConnection> ptrConnection,
287                       http_methods::ConnectionType connectionType)
288 {
289    // delay handling this RPC if requested
290    if (s_rpcDelayMs > 0)
291    {
292       boost::this_thread::sleep_for(boost::chrono::milliseconds(s_rpcDelayMs));
293    }
294 
295    // record the time just prior to execution of the event
296    // (so we can determine if any events were added during execution)
297    using namespace boost::posix_time;
298    ptime executeStartTime = microsec_clock::universal_time();
299 
300    // execute the method
301    auto it = s_pJsonRpcMethods->find(request.method);
302    if (it != s_pJsonRpcMethods->end())
303    {
304       std::pair<bool, json::JsonRpcAsyncFunction> reg = it->second;
305       json::JsonRpcAsyncFunction handlerFunction = reg.second;
306 
307       // For asyncRpc the http response was already sent - just call the handler and emit the event
308       if (ptrConnection->isAsyncRpc())
309       {
310          boost::shared_ptr<rpc::AsyncRpcConnection> asyncConn =
311                  boost::static_pointer_cast<rpc::AsyncRpcConnection>(ptrConnection);
312          handlerFunction(request,
313                          boost::bind(endHandleRpcRequestIndirect,
314                                      asyncConn->asyncHandle(),
315                                      _1,
316                                      _2));
317       }
318       // Sync rpc
319       else if (reg.first)
320       {
321          // direct return
322          handlerFunction(request,
323                          boost::bind(endHandleRpcRequestDirect,
324                                      ptrConnection,
325                                      executeStartTime,
326                                      _1,
327                                      _2));
328       }
329       // registerAsyncRpc - http connection is still open, send the async response, then emit the event
330       else
331       {
332          std::string asyncHandle = core::system::generateUuid(true);
333          sendJsonAsyncPendingResponse(request, ptrConnection, asyncHandle);
334 
335          handlerFunction(request,
336                          boost::bind(endHandleRpcRequestIndirect,
337                                      asyncHandle,
338                                      _1,
339                                      _2));
340       }
341    }
342    else
343    {
344       Error executeError = Error(json::errc::MethodNotFound, ERROR_LOCATION);
345       executeError.addProperty("method", request.method);
346 
347       // we need to know about these because they represent unexpected
348       // application states
349       LOG_ERROR(executeError);
350 
351       if (ptrConnection->isAsyncRpc())
352       {
353          boost::shared_ptr<rpc::AsyncRpcConnection> asyncConn =
354                  boost::static_pointer_cast<rpc::AsyncRpcConnection>(ptrConnection);
355          endHandleRpcRequestIndirect(asyncConn->asyncHandle(), executeError, nullptr);
356       }
357       else
358          endHandleRpcRequestDirect(ptrConnection, executeStartTime, executeError, nullptr);
359    }
360 }
361 
setRpcDelay(int delayMs)362 void setRpcDelay(int delayMs)
363 {
364    s_rpcDelayMs = delayMs;
365 }
366 
isOfflineableRequest(boost::shared_ptr<HttpConnection> ptrConnection)367 bool isOfflineableRequest(boost::shared_ptr<HttpConnection> ptrConnection)
368 {
369    // Only specific requests that do not use the R runtime are offlineable (e.g. save_document)
370    if (s_offlineableUris.find(ptrConnection->request().uri()) == s_offlineableUris.end())
371       return false;
372    return true;
373 }
374 
initialize()375 Error initialize()
376 {
377    // intentionally allocate methods on the heap and let them leak
378    // (we had seen issues in the past where an abnormally terminated
379    // R process could leak the process stuck in the destructor of
380    // this map pegging the processor at 100%; avoid this by allowing
381    // the OS to clean up memory itself after the process is gone)
382    s_pJsonRpcMethods = new core::json::JsonRpcAsyncMethods;
383 
384    RS_REGISTER_CALL_METHOD(rs_invokeRpc);
385 
386    s_offlineableUris.insert("/rpc/save_document");
387    s_offlineableUris.insert("/rpc/save_document_diff");
388    s_offlineableUris.insert("/rpc/open_document");
389    s_offlineableUris.insert("/rpc/set_client_state");
390    s_offlineableUris.insert("/rpc/list_files");
391    s_offlineableUris.insert("/rpc/modify_document_properties");
392    s_offlineableUris.insert("/rpc/check_for_external_edit");
393    s_offlineableUris.insert("/rpc/start_terminal");
394    s_offlineableUris.insert("/rpc/process_start");
395    s_offlineableUris.insert("/rpc/process_notify_start");
396    s_offlineableUris.insert("/rpc/process_notify_visible");
397    s_offlineableUris.insert("/rpc/process_get_buffer_chunk");
398    s_offlineableUris.insert("/rpc/process_erase_buffer");
399    s_offlineableUris.insert("/rpc/process_set_size");
400    s_offlineableUris.insert("/rpc/get_source_template");
401    s_offlineableUris.insert("/rpc/new_document");
402 
403    return Success();
404 }
405 
406 } // namespace rpc
407 } // namespace session
408 } // namespace rstudio
409 
410