1 /******************************************************************************
2
3 This source file is part of the MoleQueue project.
4
5 Copyright 2011-2012 Kitware, Inc.
6
7 This source code is released under the New BSD License, (the "License").
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14
15 ******************************************************************************/
16
17 #include "remote.h"
18
19 #include "../filesystemtools.h"
20 #include "../job.h"
21 #include "../jobmanager.h"
22 #include "../logentry.h"
23 #include "../logger.h"
24 #include "../program.h"
25 #include "../remotequeuewidget.h"
26 #include "../server.h"
27
28 #include <qjsondocument.h>
29
30 #include <QtCore/QTimer>
31 #include <QtCore/QDebug>
32
33 #include <QtGui>
34
35 namespace MoleQueue {
36
QueueRemote(const QString & queueName,QueueManager * parentObject)37 QueueRemote::QueueRemote(const QString &queueName, QueueManager *parentObject)
38 : Queue(queueName, parentObject),
39 m_checkForPendingJobsTimerId(-1),
40 m_queueUpdateInterval(DEFAULT_REMOTE_QUEUE_UPDATE_INTERVAL),
41 m_defaultMaxWallTime(DEFAULT_MAX_WALLTIME)
42 {
43 // Set remote queue check timer.
44 m_checkQueueTimerId = startTimer(m_queueUpdateInterval * 60000);
45
46 // Check for jobs to submit every 5 seconds
47 m_checkForPendingJobsTimerId = startTimer(5000);
48 }
49
~QueueRemote()50 QueueRemote::~QueueRemote()
51 {
52 }
53
writeJsonSettings(QJsonObject & json,bool exportOnly,bool includePrograms) const54 bool QueueRemote::writeJsonSettings(QJsonObject &json, bool exportOnly,
55 bool includePrograms) const
56 {
57 if (!Queue::writeJsonSettings(json, exportOnly, includePrograms))
58 return false;
59
60 if (!exportOnly)
61 json.insert("workingDirectoryBase", m_workingDirectoryBase);
62
63 json.insert("queueUpdateInterval",
64 static_cast<double>(m_queueUpdateInterval));
65 json.insert("defaultMaxWallTime",
66 static_cast<double>(m_defaultMaxWallTime));
67
68 return true;
69 }
70
readJsonSettings(const QJsonObject & json,bool importOnly,bool includePrograms)71 bool QueueRemote::readJsonSettings(const QJsonObject &json, bool importOnly,
72 bool includePrograms)
73 {
74 // Validate JSON:
75 if ((!importOnly && !json.value("workingDirectoryBase").isString()) ||
76 !json.value("queueUpdateInterval").isDouble() ||
77 !json.value("defaultMaxWallTime").isDouble()) {
78 Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
79 .arg(QString(QJsonDocument(json).toJson())));
80 return false;
81 }
82
83 if (!Queue::readJsonSettings(json, importOnly, includePrograms))
84 return false;
85
86 if (!importOnly)
87 m_workingDirectoryBase = json.value("workingDirectoryBase").toString();
88
89 m_queueUpdateInterval =
90 static_cast<int>(json.value("queueUpdateInterval").toDouble() + 0.5);
91 m_defaultMaxWallTime =
92 static_cast<int>(json.value("defaultMaxWallTime").toDouble() + 0.5);
93
94 return true;
95 }
96
setQueueUpdateInterval(int interval)97 void QueueRemote::setQueueUpdateInterval(int interval)
98 {
99 if (interval == m_queueUpdateInterval)
100 return;
101
102 m_queueUpdateInterval = interval;
103
104 killTimer(m_checkQueueTimerId);
105 m_checkQueueTimerId = startTimer(m_queueUpdateInterval * 60000);
106 requestQueueUpdate();
107 }
108
replaceKeywords(QString & launchScript,const Job & job,bool addNewline)109 void QueueRemote::replaceKeywords(QString &launchScript,
110 const Job &job, bool addNewline)
111 {
112 // If a valid walltime is set, replace all occurances with the appropriate
113 // string:
114 int wallTime = job.maxWallTime();
115 int hours = wallTime / 60;
116 int minutes = wallTime % 60;
117 if (wallTime > 0) {
118 launchScript.replace("$$$maxWallTime$$$",
119 QString("%1:%2:00")
120 .arg(hours, 2, 10, QChar('0'))
121 .arg(minutes, 2, 10, QChar('0')));
122 }
123 // Otherwise, erase all lines containing the keyword
124 else {
125 QRegExp expr("\\n[^\\n]*\\${3,3}maxWallTime\\${3,3}[^\\n]*\\n");
126 launchScript.replace(expr, "\n");
127 }
128
129 if (wallTime <= 0) {
130 wallTime = defaultMaxWallTime();
131 hours = wallTime / 60;
132 minutes = wallTime % 60;
133 }
134
135 launchScript.replace("$$maxWallTime$$",
136 QString("%1:%2:00")
137 .arg(hours, 2, 10, QChar('0'))
138 .arg(minutes, 2, 10, QChar('0')));
139
140 Queue::replaceKeywords(launchScript, job, addNewline);
141 }
142
submitJob(Job job)143 bool QueueRemote::submitJob(Job job)
144 {
145 if (job.isValid()) {
146 m_pendingSubmission.append(job.moleQueueId());
147 job.setJobState(MoleQueue::Accepted);
148 return true;
149 }
150 Logger::logError(tr("Refusing to submit job to Queue '%1': Job object is "
151 "invalid.").arg(m_name), job.moleQueueId());
152 return false;
153 }
154
killJob(Job job)155 void QueueRemote::killJob(Job job)
156 {
157 if (!job.isValid())
158 return;
159
160 int pendingIndex = m_pendingSubmission.indexOf(job.moleQueueId());
161 if (pendingIndex >= 0) {
162 m_pendingSubmission.removeAt(pendingIndex);
163 job.setJobState(MoleQueue::Canceled);
164 return;
165 }
166
167 if (job.queue() == m_name && job.queueId() != InvalidId &&
168 m_jobs.value(job.queueId()) == job.moleQueueId()) {
169 m_jobs.remove(job.queueId());
170 beginKillJob(job);
171 return;
172 }
173
174 Logger::logWarning(tr("Queue '%1' requested to kill unknown job that belongs "
175 "to queue '%2', queue id '%3'.").arg(m_name)
176 .arg(job.queue()).arg(idTypeToString(job.queueId())),
177 job.moleQueueId());
178 job.setJobState(MoleQueue::Canceled);
179 }
180
submitPendingJobs()181 void QueueRemote::submitPendingJobs()
182 {
183 if (m_pendingSubmission.isEmpty())
184 return;
185
186 // lookup job manager:
187 JobManager *jobManager = NULL;
188 if (m_server)
189 jobManager = m_server->jobManager();
190
191 if (!jobManager) {
192 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
193 .arg("Cannot locate server JobManager!"));
194 return;
195 }
196
197 foreach (const IdType moleQueueId, m_pendingSubmission) {
198 Job job = jobManager->lookupJobByMoleQueueId(moleQueueId);
199 // Kick off the submission process...
200 beginJobSubmission(job);
201 }
202
203 m_pendingSubmission.clear();
204 }
205
beginJobSubmission(Job job)206 void QueueRemote::beginJobSubmission(Job job)
207 {
208 if (!writeInputFiles(job)) {
209 Logger::logError(tr("Error while writing input files."), job.moleQueueId());
210 job.setJobState(Error);
211 return;
212 }
213 // Attempt to copy the files via scp first. Only call mkdir on the remote
214 // working directory if the scp call fails.
215 copyInputFilesToHost(job);
216 }
217
beginFinalizeJob(IdType queueId)218 void QueueRemote::beginFinalizeJob(IdType queueId)
219 {
220 IdType moleQueueId = m_jobs.value(queueId, InvalidId);
221 if (moleQueueId == InvalidId)
222 return;
223
224 m_jobs.remove(queueId);
225
226 // Lookup job
227 if (!m_server)
228 return;
229 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
230 if (!job.isValid())
231 return;
232
233 finalizeJobCopyFromServer(job);
234 }
235
finalizeJobCopyToCustomDestination(Job job)236 void QueueRemote::finalizeJobCopyToCustomDestination(Job job)
237 {
238 // Skip to next step if needed
239 if (job.outputDirectory().isEmpty() ||
240 job.outputDirectory() == job.localWorkingDirectory()) {
241 finalizeJobCleanup(job);
242 return;
243 }
244
245 // The copy function will throw errors if needed.
246 if (!FileSystemTools::recursiveCopyDirectory(job.localWorkingDirectory(),
247 job.outputDirectory())) {
248 Logger::logError(tr("Cannot copy '%1' -> '%2'.")
249 .arg(job.localWorkingDirectory(),
250 job.outputDirectory()), job.moleQueueId());
251 job.setJobState(MoleQueue::Error);
252 return;
253 }
254
255 finalizeJobCleanup(job);
256 }
257
finalizeJobCleanup(Job job)258 void QueueRemote::finalizeJobCleanup(Job job)
259 {
260 if (job.cleanLocalWorkingDirectory())
261 cleanLocalDirectory(job);
262
263 if (job.cleanRemoteFiles())
264 cleanRemoteDirectory(job);
265
266 job.setJobState(MoleQueue::Finished);
267 }
268
269
jobAboutToBeRemoved(const Job & job)270 void QueueRemote::jobAboutToBeRemoved(const Job &job)
271 {
272 m_pendingSubmission.removeOne(job.moleQueueId());
273 Queue::jobAboutToBeRemoved(job);
274 }
275
removeStaleJobs()276 void QueueRemote::removeStaleJobs()
277 {
278 if (m_server) {
279 if (JobManager *jobManager = m_server->jobManager()) {
280 QList<IdType> staleQueueIds;
281 for (QMap<IdType, IdType>::const_iterator it = m_jobs.constBegin(),
282 it_end = m_jobs.constEnd(); it != it_end; ++it) {
283 if (jobManager->lookupJobByMoleQueueId(it.value()).isValid())
284 continue;
285 staleQueueIds << it.key();
286 Logger::logError(tr("Job with MoleQueue id %1 is missing, but the Queue"
287 " '%2' is still holding a reference to it. Please "
288 "report this bug and check if the job needs to be "
289 "resubmitted.").arg(it.value()).arg(name()),
290 it.value());
291 }
292 foreach (IdType queueId, staleQueueIds)
293 m_jobs.remove(queueId);
294 }
295 }
296 }
297
timerEvent(QTimerEvent * theEvent)298 void QueueRemote::timerEvent(QTimerEvent *theEvent)
299 {
300 if (theEvent->timerId() == m_checkQueueTimerId) {
301 theEvent->accept();
302 removeStaleJobs();
303 if (!m_jobs.isEmpty())
304 requestQueueUpdate();
305 return;
306 }
307 else if (theEvent->timerId() == m_checkForPendingJobsTimerId) {
308 theEvent->accept();
309 submitPendingJobs();
310 return;
311 }
312
313 QObject::timerEvent(theEvent);
314 }
315
316 } // End namespace
317