1 /*
2 * JobsApi.cpp
3 *
4 * Copyright (C) 2021 by RStudio, PBC
5 *
6 * Unless you have received this program directly from RStudio pursuant
7 * to the terms of a commercial license agreement with RStudio, then
8 * this program is licensed to you under the terms of version 3 of the
9 * GNU Affero General Public License. This program is distributed WITHOUT
10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13 *
14 */
15
16 #include <session/jobs/JobsApi.hpp>
17 #include <session/SessionModuleContext.hpp>
18
19 enum JobUpdateType
20 {
21 JobAdded = 0,
22 JobUpdated = 1,
23 JobRemoved = 2
24 };
25
26 using namespace rstudio::core;
27
28 namespace rstudio {
29 namespace session {
30 namespace modules {
31 namespace jobs {
32
33 namespace {
34
35 // map of job ID to jobs
36 std::map<std::string, boost::shared_ptr<Job> > s_jobs;
37
38 // notify client that job has been updated
notifyClient(JobUpdateType update,boost::shared_ptr<Job> pJob)39 void notifyClient(JobUpdateType update, boost::shared_ptr<Job> pJob)
40 {
41 json::Object data;
42 data["type"] = static_cast<int>(update);
43 data["job"] = pJob->toJson();
44 module_context::enqueClientEvent(
45 ClientEvent(client_events::kJobUpdated, data));
46 }
47
processUpdate(boost::shared_ptr<Job> pJob)48 void processUpdate(boost::shared_ptr<Job> pJob)
49 {
50 if (pJob->complete() && pJob->autoRemove())
51 {
52 // if this job is now complete, and the job wants to be removed when complete, remove it
53 removeJob(pJob);
54 }
55 else
56 {
57 // otherwise, notify the client of the changes in the job
58 notifyClient(JobUpdated, pJob);
59 }
60 }
61
62 } // anonymous namespace
63
removeJob(boost::shared_ptr<Job> pJob)64 void removeJob(boost::shared_ptr<Job> pJob)
65 {
66 notifyClient(JobRemoved, pJob);
67 pJob->cleanup();
68 s_jobs.erase(s_jobs.find(pJob->id()));
69 }
70
lookupJob(const std::string & id,boost::shared_ptr<Job> * pJob)71 bool lookupJob(const std::string& id, boost::shared_ptr<Job> *pJob)
72 {
73 auto it = s_jobs.find(id);
74 if (it == s_jobs.end())
75 return false;
76 *pJob = it->second;
77 return true;
78 }
79
addJob(const std::string & name,const std::string & status,const std::string & group,int progress,bool confirmTermination,JobState state,JobType type,bool autoRemove,SEXP actions,JobActions cppActions,bool show,std::vector<std::string> tags)80 boost::shared_ptr<Job> addJob(
81 const std::string& name,
82 const std::string& status,
83 const std::string& group,
84 int progress,
85 bool confirmTermination,
86 JobState state,
87 JobType type,
88 bool autoRemove,
89 SEXP actions,
90 JobActions cppActions,
91 bool show,
92 std::vector<std::string> tags)
93 {
94 // find an unused job id
95 std::string id;
96 do
97 {
98 id = core::system::generateShortenedUuid();
99 } while (s_jobs.find(id) != s_jobs.end());
100 return addJob(id,
101 ::time(0), /*recorded*/
102 0, /*started*/
103 0, /*completed*/
104 name, status, group, progress, confirmTermination, state, type,
105 "" /*cluster*/,
106 autoRemove, actions, cppActions, show,
107 true, /*saveOutput*/
108 tags);
109 }
110
addJob(const std::string & id,time_t recorded,time_t started,time_t completed,const std::string & name,const std::string & status,const std::string & group,int progress,bool confirmTermination,JobState state,JobType type,const std::string & cluster,bool autoRemove,SEXP actions,JobActions cppActions,bool show,bool saveOutput,std::vector<std::string> tags)111 boost::shared_ptr<Job> addJob(
112 const std::string& id,
113 time_t recorded,
114 time_t started,
115 time_t completed,
116 const std::string& name,
117 const std::string& status,
118 const std::string& group,
119 int progress,
120 bool confirmTermination,
121 JobState state,
122 JobType type,
123 const std::string& cluster,
124 bool autoRemove,
125 SEXP actions,
126 JobActions cppActions,
127 bool show,
128 bool saveOutput,
129 std::vector<std::string> tags)
130 {
131 // create the job!
132 boost::shared_ptr<Job> pJob = boost::make_shared<Job>(
133 id, recorded, started, completed, name, status, group, 0 /* completed units */,
134 progress, confirmTermination, state, type, cluster, autoRemove, actions, cppActions, show, saveOutput, tags);
135
136 // cache job and notify client
137 s_jobs[id] = pJob;
138 notifyClient(JobAdded, pJob);
139
140 return pJob;
141 }
142
setJobProgress(boost::shared_ptr<Job> pJob,int units)143 void setJobProgress(boost::shared_ptr<Job> pJob, int units)
144 {
145 pJob->setProgress(units);
146 processUpdate(pJob);
147 }
148
setProgressMax(boost::shared_ptr<Job> pJob,int max)149 void setProgressMax(boost::shared_ptr<Job> pJob, int max)
150 {
151 pJob->setProgressMax(max);
152 processUpdate(pJob);
153 }
154
setJobState(boost::shared_ptr<Job> pJob,JobState state)155 void setJobState(boost::shared_ptr<Job> pJob, JobState state)
156 {
157 pJob->setState(state);
158 processUpdate(pJob);
159 }
160
setJobStatus(boost::shared_ptr<Job> pJob,const std::string & status)161 void setJobStatus(boost::shared_ptr<Job> pJob, const std::string& status)
162 {
163 pJob->setStatus(status);
164 processUpdate(pJob);
165 }
166
jobsAsJson()167 json::Object jobsAsJson()
168 {
169 json::Object jobs;
170
171 // convert all jobs to json
172 for (auto& job: s_jobs)
173 {
174 jobs[job.first] = job.second->toJson();
175 }
176
177 return jobs;
178 }
179
removeAllJobs()180 void removeAllJobs()
181 {
182 for (auto& job: s_jobs)
183 {
184 job.second->cleanup();
185 notifyClient(JobRemoved, job.second);
186 }
187 s_jobs.clear();
188 }
189
removeAllLocalJobs()190 void removeAllLocalJobs()
191 {
192 for (auto it = s_jobs.cbegin(); it != s_jobs.cend() ; )
193 {
194 if (it->second->type() == JobType::JobTypeSession)
195 {
196 it->second->cleanup();
197 notifyClient(JobRemoved, it->second);
198 it = s_jobs.erase(it);
199 }
200 else
201 ++it;
202 }
203 }
204
removeAllLauncherJobs()205 void removeAllLauncherJobs()
206 {
207 for (auto it = s_jobs.cbegin(); it != s_jobs.cend() ; )
208 {
209 if (it->second->type() == JobType::JobTypeLauncher)
210 {
211 it->second->cleanup();
212 it = s_jobs.erase(it);
213 }
214 else
215 ++it;
216 }
217 }
218
removeCompletedLocalJobs()219 void removeCompletedLocalJobs()
220 {
221 // collect completed jobs
222 std::vector<boost::shared_ptr<Job> > completed;
223 for (auto& job: s_jobs)
224 {
225 if (job.second->complete() && job.second->type() == JobType::JobTypeSession)
226 completed.push_back(job.second);
227 }
228
229 // and remove them!
230 for (auto& job: completed)
231 {
232 removeJob(job);
233 }
234 }
235
endAllJobStreaming()236 void endAllJobStreaming()
237 {
238 for (auto& job: s_jobs)
239 {
240 job.second->setListening(false);
241 }
242 }
243
localJobsRunning()244 bool localJobsRunning()
245 {
246 for (auto& job: s_jobs)
247 {
248 if (job.second->type() == JobType::JobTypeSession && !job.second->complete())
249 {
250 return true;
251 }
252 }
253 return false;
254 }
255
256 } // namespace jobs
257 } // namespace modules
258 } // namespace session
259 } // namespace rstudio
260
261