1 /*
2  * JobsApi.cpp
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 
16 #include <session/jobs/JobsApi.hpp>
17 #include <session/SessionModuleContext.hpp>
18 
19 enum JobUpdateType
20 {
21    JobAdded   = 0,
22    JobUpdated = 1,
23    JobRemoved = 2
24 };
25 
26 using namespace rstudio::core;
27 
28 namespace rstudio {
29 namespace session {
30 namespace modules {
31 namespace jobs {
32 
33 namespace {
34 
35 // map of job ID to jobs
36 std::map<std::string, boost::shared_ptr<Job> > s_jobs;
37 
38 // notify client that job has been updated
notifyClient(JobUpdateType update,boost::shared_ptr<Job> pJob)39 void notifyClient(JobUpdateType update, boost::shared_ptr<Job> pJob)
40 {
41    json::Object data;
42    data["type"] = static_cast<int>(update);
43    data["job"]  = pJob->toJson();
44    module_context::enqueClientEvent(
45          ClientEvent(client_events::kJobUpdated, data));
46 }
47 
processUpdate(boost::shared_ptr<Job> pJob)48 void processUpdate(boost::shared_ptr<Job> pJob)
49 {
50    if (pJob->complete() && pJob->autoRemove())
51    {
52       // if this job is now complete, and the job wants to be removed when complete, remove it
53       removeJob(pJob);
54    }
55    else
56    {
57       // otherwise, notify the client of the changes in the job
58       notifyClient(JobUpdated, pJob);
59    }
60 }
61 
62 } // anonymous namespace
63 
removeJob(boost::shared_ptr<Job> pJob)64 void removeJob(boost::shared_ptr<Job> pJob)
65 {
66    notifyClient(JobRemoved, pJob);
67    pJob->cleanup();
68    s_jobs.erase(s_jobs.find(pJob->id()));
69 }
70 
lookupJob(const std::string & id,boost::shared_ptr<Job> * pJob)71 bool lookupJob(const std::string& id, boost::shared_ptr<Job> *pJob)
72 {
73    auto it = s_jobs.find(id);
74    if (it == s_jobs.end())
75       return false;
76    *pJob = it->second;
77    return true;
78 }
79 
addJob(const std::string & name,const std::string & status,const std::string & group,int progress,bool confirmTermination,JobState state,JobType type,bool autoRemove,SEXP actions,JobActions cppActions,bool show,std::vector<std::string> tags)80 boost::shared_ptr<Job> addJob(
81       const std::string& name,
82       const std::string& status,
83       const std::string& group,
84       int progress,
85       bool confirmTermination,
86       JobState state,
87       JobType type,
88       bool autoRemove,
89       SEXP actions,
90       JobActions cppActions,
91       bool show,
92       std::vector<std::string> tags)
93 {
94    // find an unused job id
95    std::string id;
96    do
97    {
98       id = core::system::generateShortenedUuid();
99    } while (s_jobs.find(id) != s_jobs.end());
100    return addJob(id,
101          ::time(0), /*recorded*/
102          0, /*started*/
103          0, /*completed*/
104          name, status, group, progress, confirmTermination, state, type,
105          "" /*cluster*/,
106          autoRemove, actions, cppActions, show,
107          true, /*saveOutput*/
108          tags);
109 }
110 
addJob(const std::string & id,time_t recorded,time_t started,time_t completed,const std::string & name,const std::string & status,const std::string & group,int progress,bool confirmTermination,JobState state,JobType type,const std::string & cluster,bool autoRemove,SEXP actions,JobActions cppActions,bool show,bool saveOutput,std::vector<std::string> tags)111 boost::shared_ptr<Job> addJob(
112       const std::string& id,
113       time_t recorded,
114       time_t started,
115       time_t completed,
116       const std::string& name,
117       const std::string& status,
118       const std::string& group,
119       int progress,
120       bool confirmTermination,
121       JobState state,
122       JobType type,
123       const std::string& cluster,
124       bool autoRemove,
125       SEXP actions,
126       JobActions cppActions,
127       bool show,
128       bool saveOutput,
129       std::vector<std::string> tags)
130 {
131    // create the job!
132    boost::shared_ptr<Job> pJob = boost::make_shared<Job>(
133          id, recorded, started, completed, name, status, group, 0 /* completed units */,
134          progress, confirmTermination, state, type, cluster, autoRemove, actions, cppActions, show, saveOutput, tags);
135 
136    // cache job and notify client
137    s_jobs[id] = pJob;
138    notifyClient(JobAdded, pJob);
139 
140    return pJob;
141 }
142 
setJobProgress(boost::shared_ptr<Job> pJob,int units)143 void setJobProgress(boost::shared_ptr<Job> pJob, int units)
144 {
145    pJob->setProgress(units);
146    processUpdate(pJob);
147 }
148 
setProgressMax(boost::shared_ptr<Job> pJob,int max)149 void setProgressMax(boost::shared_ptr<Job> pJob, int max)
150 {
151    pJob->setProgressMax(max);
152    processUpdate(pJob);
153 }
154 
setJobState(boost::shared_ptr<Job> pJob,JobState state)155 void setJobState(boost::shared_ptr<Job> pJob, JobState state)
156 {
157    pJob->setState(state);
158    processUpdate(pJob);
159 }
160 
setJobStatus(boost::shared_ptr<Job> pJob,const std::string & status)161 void setJobStatus(boost::shared_ptr<Job> pJob, const std::string& status)
162 {
163    pJob->setStatus(status);
164    processUpdate(pJob);
165 }
166 
jobsAsJson()167 json::Object jobsAsJson()
168 {
169    json::Object jobs;
170 
171    // convert all jobs to json
172    for (auto& job: s_jobs)
173    {
174       jobs[job.first] = job.second->toJson();
175    }
176 
177    return jobs;
178 }
179 
removeAllJobs()180 void removeAllJobs()
181 {
182    for (auto& job: s_jobs)
183    {
184       job.second->cleanup();
185       notifyClient(JobRemoved, job.second);
186    }
187    s_jobs.clear();
188 }
189 
removeAllLocalJobs()190 void removeAllLocalJobs()
191 {
192    for (auto it = s_jobs.cbegin(); it != s_jobs.cend() ; )
193    {
194       if (it->second->type() == JobType::JobTypeSession)
195       {
196          it->second->cleanup();
197          notifyClient(JobRemoved, it->second);
198          it = s_jobs.erase(it);
199       }
200       else
201          ++it;
202    }
203 }
204 
removeAllLauncherJobs()205 void removeAllLauncherJobs()
206 {
207    for (auto it = s_jobs.cbegin(); it != s_jobs.cend() ; )
208    {
209       if (it->second->type() == JobType::JobTypeLauncher)
210       {
211          it->second->cleanup();
212          it = s_jobs.erase(it);
213       }
214       else
215          ++it;
216    }
217 }
218 
removeCompletedLocalJobs()219 void removeCompletedLocalJobs()
220 {
221    // collect completed jobs
222    std::vector<boost::shared_ptr<Job> > completed;
223    for (auto& job: s_jobs)
224    {
225       if (job.second->complete() && job.second->type() == JobType::JobTypeSession)
226          completed.push_back(job.second);
227    }
228 
229    // and remove them!
230    for (auto& job: completed)
231    {
232       removeJob(job);
233    }
234 }
235 
endAllJobStreaming()236 void endAllJobStreaming()
237 {
238    for (auto& job: s_jobs)
239    {
240       job.second->setListening(false);
241    }
242 }
243 
localJobsRunning()244 bool localJobsRunning()
245 {
246    for (auto& job: s_jobs)
247    {
248       if (job.second->type() == JobType::JobTypeSession && !job.second->complete())
249       {
250          return true;
251       }
252    }
253    return false;
254 }
255 
256 } // namespace jobs
257 } // namespace modules
258 } // namespace session
259 } // namespace rstudio
260 
261