1 /*
2  * SessionData.cpp
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 
16 #include "SessionData.hpp"
17 
18 #include <core/Exec.hpp>
19 
20 #include <r/RExec.hpp>
21 #include <r/RErrorCategory.hpp>
22 #include <r/RJson.hpp>
23 #include <r/RSourceManager.hpp>
24 #include <r/session/RSessionUtils.hpp>
25 
26 #include <session/SessionModuleContext.hpp>
27 #include <session/SessionAsyncRProcess.hpp>
28 
29 #include "DataViewer.hpp"
30 
31 using namespace rstudio::core;
32 
33 namespace rstudio {
34 namespace session {
35 namespace modules {
36 namespace data {
37 
38 class AsyncDataPreviewRProcess : public async_r::AsyncRProcess
39 {
40 public:
create(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)41    static boost::shared_ptr<AsyncDataPreviewRProcess> create(
42            const json::JsonRpcRequest& request,
43            const json::JsonRpcFunctionContinuation& continuation)
44    {
45       boost::shared_ptr<AsyncDataPreviewRProcess> pDataPreview(
46                   new AsyncDataPreviewRProcess(request, continuation));
47       pDataPreview->start();
48       return pDataPreview;
49    }
50 
51 private:
AsyncDataPreviewRProcess(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)52    AsyncDataPreviewRProcess(
53            const json::JsonRpcRequest& request,
54            const json::JsonRpcFunctionContinuation& continuation) :
55        continuation_(continuation),
56        request_(request)
57    {
58    }
59 
pathFromModulesSource(std::string sourceFile)60    FilePath pathFromModulesSource(std::string sourceFile)
61    {
62       FilePath modulesPath = session::options().modulesRSourcePath();
63       FilePath srcPath = modulesPath.completePath(sourceFile);
64 
65       return srcPath;
66    }
67 
pathFromSource(std::string sourceFile)68    FilePath pathFromSource(std::string sourceFile)
69    {
70       FilePath sourcesPath = session::options().coreRSourcePath();
71       FilePath srcPath = sourcesPath.completePath(sourceFile);
72 
73       return srcPath;
74    }
75 
saveRDS(const json::JsonRpcRequest & request)76    Error saveRDS(const json::JsonRpcRequest& request)
77    {
78       r::sexp::Protect rProtect;
79       r::exec::RFunction rFunction("saveRDS");
80 
81       rFunction.addParam(request.params);
82       rFunction.addParam("file", inputLocation_);
83 
84       SEXP resultSEXP;
85       return rFunction.call(&resultSEXP, &rProtect);
86    }
87 
start()88    void start()
89    {
90       inputLocation_ = module_context::tempFile("input", "rds").getAbsolutePath();
91       outputLocation_ = module_context::tempFile("output", "rds").getAbsolutePath();
92 
93       Error err = saveRDS(request_);
94       if (err)
95       {
96          continuationWithError("Failed to prepare the operation.");
97          return;
98       }
99 
100       std::string cmd = std::string(".rs.callWithRDS(") +
101         "\".rs.rpc.preview_data_import\", \"" +
102         inputLocation_ +
103         "\", \"" +
104         outputLocation_ +
105         "\")";
106 
107       std::vector<core::FilePath> sources;
108       sources.push_back(pathFromSource("Tools.R"));
109       sources.push_back(pathFromModulesSource("ModuleTools.R"));
110       sources.push_back(pathFromModulesSource("SessionCodeTools.R"));
111       sources.push_back(pathFromModulesSource("SessionDataViewer.R"));
112       sources.push_back(pathFromModulesSource("SessionDataImportV2.R"));
113 
114       async_r::AsyncRProcess::start(cmd.c_str(), FilePath(), async_r::R_PROCESS_VANILLA, sources);
115    }
116 
readRDS(SEXP * pResult)117    Error readRDS(SEXP* pResult)
118    {
119        r::sexp::Protect rProtect;
120        r::exec::RFunction rFunction("readRDS");
121 
122        rFunction.addParam(outputLocation_);
123        rFunction.call(pResult, &rProtect);
124 
125        return Success();
126    }
127 
continuationWithError(const char * message)128    void continuationWithError(const char* message)
129    {
130       json::Object jsonError;
131 
132       jsonError["message"] = message;
133       if (errors_.size() > 0)
134       {
135          jsonError["message"] = json::toJsonArray(errors_);
136       }
137 
138       json::Object jsonErrorResponse;
139       jsonErrorResponse["error"] = jsonError;
140 
141       json::JsonRpcResponse response;
142       response.setResult(jsonErrorResponse);
143 
144       continuation_(Success(), &response);
145    }
146 
onCompleted(int exitStatus)147    void onCompleted(int exitStatus)
148    {
149       if (terminationRequested())
150       {
151          json::JsonRpcResponse response;
152          continuation_(Success(), &response);
153          return;
154       }
155 
156       if (exitStatus != EXIT_SUCCESS)
157       {
158          continuationWithError("Operation finished with error code.");
159          return;
160       }
161 
162       SEXP resultSEXP;;
163       Error error = readRDS(&resultSEXP);
164       if (error)
165       {
166          continuationWithError("Failed to complete the operation.");
167          return;
168       }
169       else
170       {
171          core::json::Value resultValue;
172          error = r::json::jsonValueFromObject(resultSEXP, &resultValue);
173          if (error)
174          {
175             continuationWithError("Failed to parse result from the operation execution.");
176             return;
177          }
178 
179          json::JsonRpcResponse response;
180          response.setResult(resultValue);
181 
182          continuation_(Success(), &response);
183       }
184    }
185 
onStdout(const std::string & output)186    void onStdout(const std::string& output)
187    {
188       output_ += output;
189    }
190 
onStderr(const std::string & output)191    void onStderr(const std::string& output)
192    {
193       errors_.push_back(output);
194    }
195 
196    const json::JsonRpcFunctionContinuation continuation_;
197    json::JsonRpcRequest request_;
198    std::vector<std::string> errors_;
199    std::string output_;
200 
201    std::string inputLocation_;
202    std::string outputLocation_;
203 };
204 
205 boost::shared_ptr<AsyncDataPreviewRProcess> s_pActiveDataPreview;
206 
getPreviewDataImportAsync(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)207 bool getPreviewDataImportAsync(
208         const json::JsonRpcRequest& request,
209         const json::JsonRpcFunctionContinuation& continuation)
210 {
211    if (s_pActiveDataPreview &&
212        s_pActiveDataPreview->isRunning())
213    {
214       return true;
215    }
216    else
217    {
218       s_pActiveDataPreview = AsyncDataPreviewRProcess::create(request, continuation);
219       return false;
220    }
221 }
222 
abortPreviewDataImportAsync(const json::JsonRpcRequest & request,json::JsonRpcResponse * pResponse)223 Error abortPreviewDataImportAsync(const json::JsonRpcRequest& request,
224                                  json::JsonRpcResponse* pResponse)
225 {
226    if (s_pActiveDataPreview &&
227        s_pActiveDataPreview->isRunning())
228    {
229       s_pActiveDataPreview->terminate();
230    }
231 
232    return Success();
233 }
234 
initialize()235 Error initialize()
236 {
237    using boost::bind;
238    using namespace session::module_context;
239    ExecBlock initBlock;
240    initBlock.addFunctions()
241       (data::viewer::initialize)
242       (bind(sourceModuleRFile, "SessionDataImport.R"))
243       (bind(sourceModuleRFile, "SessionDataImportV2.R"))
244       (bind(sourceModuleRFile, "SessionDataPreview.R"))
245       (bind(registerAsyncRpcMethod, "preview_data_import_async", getPreviewDataImportAsync))
246       (bind(registerRpcMethod, "preview_data_import_async_abort", abortPreviewDataImportAsync));
247 
248    return initBlock.execute();
249 }
250 
251 } // namespace data
252 } // namespace modules
253 } // namespace session
254 } // namespace rstudio
255 
256