1 /*
2 * SessionData.cpp
3 *
4 * Copyright (C) 2021 by RStudio, PBC
5 *
6 * Unless you have received this program directly from RStudio pursuant
7 * to the terms of a commercial license agreement with RStudio, then
8 * this program is licensed to you under the terms of version 3 of the
9 * GNU Affero General Public License. This program is distributed WITHOUT
10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13 *
14 */
15
16 #include "SessionData.hpp"
17
18 #include <core/Exec.hpp>
19
20 #include <r/RExec.hpp>
21 #include <r/RErrorCategory.hpp>
22 #include <r/RJson.hpp>
23 #include <r/RSourceManager.hpp>
24 #include <r/session/RSessionUtils.hpp>
25
26 #include <session/SessionModuleContext.hpp>
27 #include <session/SessionAsyncRProcess.hpp>
28
29 #include "DataViewer.hpp"
30
31 using namespace rstudio::core;
32
33 namespace rstudio {
34 namespace session {
35 namespace modules {
36 namespace data {
37
38 class AsyncDataPreviewRProcess : public async_r::AsyncRProcess
39 {
40 public:
create(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)41 static boost::shared_ptr<AsyncDataPreviewRProcess> create(
42 const json::JsonRpcRequest& request,
43 const json::JsonRpcFunctionContinuation& continuation)
44 {
45 boost::shared_ptr<AsyncDataPreviewRProcess> pDataPreview(
46 new AsyncDataPreviewRProcess(request, continuation));
47 pDataPreview->start();
48 return pDataPreview;
49 }
50
51 private:
AsyncDataPreviewRProcess(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)52 AsyncDataPreviewRProcess(
53 const json::JsonRpcRequest& request,
54 const json::JsonRpcFunctionContinuation& continuation) :
55 continuation_(continuation),
56 request_(request)
57 {
58 }
59
pathFromModulesSource(std::string sourceFile)60 FilePath pathFromModulesSource(std::string sourceFile)
61 {
62 FilePath modulesPath = session::options().modulesRSourcePath();
63 FilePath srcPath = modulesPath.completePath(sourceFile);
64
65 return srcPath;
66 }
67
pathFromSource(std::string sourceFile)68 FilePath pathFromSource(std::string sourceFile)
69 {
70 FilePath sourcesPath = session::options().coreRSourcePath();
71 FilePath srcPath = sourcesPath.completePath(sourceFile);
72
73 return srcPath;
74 }
75
saveRDS(const json::JsonRpcRequest & request)76 Error saveRDS(const json::JsonRpcRequest& request)
77 {
78 r::sexp::Protect rProtect;
79 r::exec::RFunction rFunction("saveRDS");
80
81 rFunction.addParam(request.params);
82 rFunction.addParam("file", inputLocation_);
83
84 SEXP resultSEXP;
85 return rFunction.call(&resultSEXP, &rProtect);
86 }
87
start()88 void start()
89 {
90 inputLocation_ = module_context::tempFile("input", "rds").getAbsolutePath();
91 outputLocation_ = module_context::tempFile("output", "rds").getAbsolutePath();
92
93 Error err = saveRDS(request_);
94 if (err)
95 {
96 continuationWithError("Failed to prepare the operation.");
97 return;
98 }
99
100 std::string cmd = std::string(".rs.callWithRDS(") +
101 "\".rs.rpc.preview_data_import\", \"" +
102 inputLocation_ +
103 "\", \"" +
104 outputLocation_ +
105 "\")";
106
107 std::vector<core::FilePath> sources;
108 sources.push_back(pathFromSource("Tools.R"));
109 sources.push_back(pathFromModulesSource("ModuleTools.R"));
110 sources.push_back(pathFromModulesSource("SessionCodeTools.R"));
111 sources.push_back(pathFromModulesSource("SessionDataViewer.R"));
112 sources.push_back(pathFromModulesSource("SessionDataImportV2.R"));
113
114 async_r::AsyncRProcess::start(cmd.c_str(), FilePath(), async_r::R_PROCESS_VANILLA, sources);
115 }
116
readRDS(SEXP * pResult)117 Error readRDS(SEXP* pResult)
118 {
119 r::sexp::Protect rProtect;
120 r::exec::RFunction rFunction("readRDS");
121
122 rFunction.addParam(outputLocation_);
123 rFunction.call(pResult, &rProtect);
124
125 return Success();
126 }
127
continuationWithError(const char * message)128 void continuationWithError(const char* message)
129 {
130 json::Object jsonError;
131
132 jsonError["message"] = message;
133 if (errors_.size() > 0)
134 {
135 jsonError["message"] = json::toJsonArray(errors_);
136 }
137
138 json::Object jsonErrorResponse;
139 jsonErrorResponse["error"] = jsonError;
140
141 json::JsonRpcResponse response;
142 response.setResult(jsonErrorResponse);
143
144 continuation_(Success(), &response);
145 }
146
onCompleted(int exitStatus)147 void onCompleted(int exitStatus)
148 {
149 if (terminationRequested())
150 {
151 json::JsonRpcResponse response;
152 continuation_(Success(), &response);
153 return;
154 }
155
156 if (exitStatus != EXIT_SUCCESS)
157 {
158 continuationWithError("Operation finished with error code.");
159 return;
160 }
161
162 SEXP resultSEXP;;
163 Error error = readRDS(&resultSEXP);
164 if (error)
165 {
166 continuationWithError("Failed to complete the operation.");
167 return;
168 }
169 else
170 {
171 core::json::Value resultValue;
172 error = r::json::jsonValueFromObject(resultSEXP, &resultValue);
173 if (error)
174 {
175 continuationWithError("Failed to parse result from the operation execution.");
176 return;
177 }
178
179 json::JsonRpcResponse response;
180 response.setResult(resultValue);
181
182 continuation_(Success(), &response);
183 }
184 }
185
onStdout(const std::string & output)186 void onStdout(const std::string& output)
187 {
188 output_ += output;
189 }
190
onStderr(const std::string & output)191 void onStderr(const std::string& output)
192 {
193 errors_.push_back(output);
194 }
195
196 const json::JsonRpcFunctionContinuation continuation_;
197 json::JsonRpcRequest request_;
198 std::vector<std::string> errors_;
199 std::string output_;
200
201 std::string inputLocation_;
202 std::string outputLocation_;
203 };
204
205 boost::shared_ptr<AsyncDataPreviewRProcess> s_pActiveDataPreview;
206
getPreviewDataImportAsync(const json::JsonRpcRequest & request,const json::JsonRpcFunctionContinuation & continuation)207 bool getPreviewDataImportAsync(
208 const json::JsonRpcRequest& request,
209 const json::JsonRpcFunctionContinuation& continuation)
210 {
211 if (s_pActiveDataPreview &&
212 s_pActiveDataPreview->isRunning())
213 {
214 return true;
215 }
216 else
217 {
218 s_pActiveDataPreview = AsyncDataPreviewRProcess::create(request, continuation);
219 return false;
220 }
221 }
222
abortPreviewDataImportAsync(const json::JsonRpcRequest & request,json::JsonRpcResponse * pResponse)223 Error abortPreviewDataImportAsync(const json::JsonRpcRequest& request,
224 json::JsonRpcResponse* pResponse)
225 {
226 if (s_pActiveDataPreview &&
227 s_pActiveDataPreview->isRunning())
228 {
229 s_pActiveDataPreview->terminate();
230 }
231
232 return Success();
233 }
234
initialize()235 Error initialize()
236 {
237 using boost::bind;
238 using namespace session::module_context;
239 ExecBlock initBlock;
240 initBlock.addFunctions()
241 (data::viewer::initialize)
242 (bind(sourceModuleRFile, "SessionDataImport.R"))
243 (bind(sourceModuleRFile, "SessionDataImportV2.R"))
244 (bind(sourceModuleRFile, "SessionDataPreview.R"))
245 (bind(registerAsyncRpcMethod, "preview_data_import_async", getPreviewDataImportAsync))
246 (bind(registerRpcMethod, "preview_data_import_async_abort", abortPreviewDataImportAsync));
247
248 return initBlock.execute();
249 }
250
251 } // namespace data
252 } // namespace modules
253 } // namespace session
254 } // namespace rstudio
255
256