1 /******************************************************************************
2 
3   This source file is part of the MoleQueue project.
4 
5   Copyright 2011-2012 Kitware, Inc.
6 
7   This source code is released under the New BSD License, (the "License").
8 
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14 
15 ******************************************************************************/
16 
17 #include "remote.h"
18 
19 #include "../filesystemtools.h"
20 #include "../job.h"
21 #include "../jobmanager.h"
22 #include "../logentry.h"
23 #include "../logger.h"
24 #include "../program.h"
25 #include "../remotequeuewidget.h"
26 #include "../server.h"
27 
28 #include <qjsondocument.h>
29 
30 #include <QtCore/QTimer>
31 #include <QtCore/QDebug>
32 
33 #include <QtGui>
34 
35 namespace MoleQueue {
36 
QueueRemote(const QString & queueName,QueueManager * parentObject)37 QueueRemote::QueueRemote(const QString &queueName, QueueManager *parentObject)
38   : Queue(queueName, parentObject),
39     m_checkForPendingJobsTimerId(-1),
40     m_queueUpdateInterval(DEFAULT_REMOTE_QUEUE_UPDATE_INTERVAL),
41     m_defaultMaxWallTime(DEFAULT_MAX_WALLTIME)
42 {
43   // Set remote queue check timer.
44   m_checkQueueTimerId = startTimer(m_queueUpdateInterval * 60000);
45 
46   // Check for jobs to submit every 5 seconds
47   m_checkForPendingJobsTimerId = startTimer(5000);
48 }
49 
~QueueRemote()50 QueueRemote::~QueueRemote()
51 {
52 }
53 
writeJsonSettings(QJsonObject & json,bool exportOnly,bool includePrograms) const54 bool QueueRemote::writeJsonSettings(QJsonObject &json, bool exportOnly,
55                                     bool includePrograms) const
56 {
57   if (!Queue::writeJsonSettings(json, exportOnly, includePrograms))
58     return false;
59 
60   if (!exportOnly)
61     json.insert("workingDirectoryBase", m_workingDirectoryBase);
62 
63   json.insert("queueUpdateInterval",
64               static_cast<double>(m_queueUpdateInterval));
65   json.insert("defaultMaxWallTime",
66               static_cast<double>(m_defaultMaxWallTime));
67 
68   return true;
69 }
70 
readJsonSettings(const QJsonObject & json,bool importOnly,bool includePrograms)71 bool QueueRemote::readJsonSettings(const QJsonObject &json, bool importOnly,
72                                    bool includePrograms)
73 {
74   // Validate JSON:
75   if ((!importOnly && !json.value("workingDirectoryBase").isString()) ||
76       !json.value("queueUpdateInterval").isDouble() ||
77       !json.value("defaultMaxWallTime").isDouble()) {
78     Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
79                      .arg(QString(QJsonDocument(json).toJson())));
80     return false;
81   }
82 
83   if (!Queue::readJsonSettings(json, importOnly, includePrograms))
84     return false;
85 
86   if (!importOnly)
87     m_workingDirectoryBase = json.value("workingDirectoryBase").toString();
88 
89   m_queueUpdateInterval =
90       static_cast<int>(json.value("queueUpdateInterval").toDouble() + 0.5);
91   m_defaultMaxWallTime =
92       static_cast<int>(json.value("defaultMaxWallTime").toDouble() + 0.5);
93 
94   return true;
95 }
96 
setQueueUpdateInterval(int interval)97 void QueueRemote::setQueueUpdateInterval(int interval)
98 {
99   if (interval == m_queueUpdateInterval)
100     return;
101 
102   m_queueUpdateInterval = interval;
103 
104   killTimer(m_checkQueueTimerId);
105   m_checkQueueTimerId = startTimer(m_queueUpdateInterval * 60000);
106   requestQueueUpdate();
107 }
108 
replaceKeywords(QString & launchScript,const Job & job,bool addNewline)109 void QueueRemote::replaceKeywords(QString &launchScript,
110                                   const Job &job, bool addNewline)
111 {
112   // If a valid walltime is set, replace all occurances with the appropriate
113   // string:
114   int wallTime = job.maxWallTime();
115   int hours = wallTime / 60;
116   int minutes = wallTime % 60;
117   if (wallTime > 0) {
118     launchScript.replace("$$$maxWallTime$$$",
119                          QString("%1:%2:00")
120                          .arg(hours, 2, 10, QChar('0'))
121                          .arg(minutes, 2, 10, QChar('0')));
122   }
123   // Otherwise, erase all lines containing the keyword
124   else {
125     QRegExp expr("\\n[^\\n]*\\${3,3}maxWallTime\\${3,3}[^\\n]*\\n");
126     launchScript.replace(expr, "\n");
127   }
128 
129   if (wallTime <= 0) {
130     wallTime = defaultMaxWallTime();
131     hours = wallTime / 60;
132     minutes = wallTime % 60;
133   }
134 
135   launchScript.replace("$$maxWallTime$$",
136                        QString("%1:%2:00")
137                        .arg(hours, 2, 10, QChar('0'))
138                        .arg(minutes, 2, 10, QChar('0')));
139 
140   Queue::replaceKeywords(launchScript, job, addNewline);
141 }
142 
submitJob(Job job)143 bool QueueRemote::submitJob(Job job)
144 {
145   if (job.isValid()) {
146     m_pendingSubmission.append(job.moleQueueId());
147     job.setJobState(MoleQueue::Accepted);
148     return true;
149   }
150   Logger::logError(tr("Refusing to submit job to Queue '%1': Job object is "
151                       "invalid.").arg(m_name), job.moleQueueId());
152   return false;
153 }
154 
killJob(Job job)155 void QueueRemote::killJob(Job job)
156 {
157   if (!job.isValid())
158     return;
159 
160   int pendingIndex = m_pendingSubmission.indexOf(job.moleQueueId());
161   if (pendingIndex >= 0) {
162     m_pendingSubmission.removeAt(pendingIndex);
163     job.setJobState(MoleQueue::Canceled);
164     return;
165   }
166 
167   if (job.queue() == m_name && job.queueId() != InvalidId &&
168       m_jobs.value(job.queueId()) == job.moleQueueId()) {
169     m_jobs.remove(job.queueId());
170     beginKillJob(job);
171     return;
172   }
173 
174   Logger::logWarning(tr("Queue '%1' requested to kill unknown job that belongs "
175                         "to queue '%2', queue id '%3'.").arg(m_name)
176                      .arg(job.queue()).arg(idTypeToString(job.queueId())),
177                      job.moleQueueId());
178   job.setJobState(MoleQueue::Canceled);
179 }
180 
submitPendingJobs()181 void QueueRemote::submitPendingJobs()
182 {
183   if (m_pendingSubmission.isEmpty())
184     return;
185 
186   // lookup job manager:
187   JobManager *jobManager = NULL;
188   if (m_server)
189     jobManager = m_server->jobManager();
190 
191   if (!jobManager) {
192     Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
193                      .arg("Cannot locate server JobManager!"));
194     return;
195   }
196 
197   foreach (const IdType moleQueueId, m_pendingSubmission) {
198     Job job = jobManager->lookupJobByMoleQueueId(moleQueueId);
199     // Kick off the submission process...
200     beginJobSubmission(job);
201   }
202 
203   m_pendingSubmission.clear();
204 }
205 
beginJobSubmission(Job job)206 void QueueRemote::beginJobSubmission(Job job)
207 {
208   if (!writeInputFiles(job)) {
209     Logger::logError(tr("Error while writing input files."), job.moleQueueId());
210     job.setJobState(Error);
211     return;
212   }
213   // Attempt to copy the files via scp first. Only call mkdir on the remote
214   // working directory if the scp call fails.
215   copyInputFilesToHost(job);
216 }
217 
beginFinalizeJob(IdType queueId)218 void QueueRemote::beginFinalizeJob(IdType queueId)
219 {
220   IdType moleQueueId = m_jobs.value(queueId, InvalidId);
221   if (moleQueueId == InvalidId)
222     return;
223 
224   m_jobs.remove(queueId);
225 
226   // Lookup job
227   if (!m_server)
228     return;
229   Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
230   if (!job.isValid())
231     return;
232 
233   finalizeJobCopyFromServer(job);
234 }
235 
finalizeJobCopyToCustomDestination(Job job)236 void QueueRemote::finalizeJobCopyToCustomDestination(Job job)
237 {
238   // Skip to next step if needed
239   if (job.outputDirectory().isEmpty() ||
240       job.outputDirectory() == job.localWorkingDirectory()) {
241     finalizeJobCleanup(job);
242     return;
243   }
244 
245   // The copy function will throw errors if needed.
246   if (!FileSystemTools::recursiveCopyDirectory(job.localWorkingDirectory(),
247                                                job.outputDirectory())) {
248     Logger::logError(tr("Cannot copy '%1' -> '%2'.")
249                      .arg(job.localWorkingDirectory(),
250                           job.outputDirectory()), job.moleQueueId());
251     job.setJobState(MoleQueue::Error);
252     return;
253   }
254 
255   finalizeJobCleanup(job);
256 }
257 
finalizeJobCleanup(Job job)258 void QueueRemote::finalizeJobCleanup(Job job)
259 {
260   if (job.cleanLocalWorkingDirectory())
261     cleanLocalDirectory(job);
262 
263   if (job.cleanRemoteFiles())
264     cleanRemoteDirectory(job);
265 
266   job.setJobState(MoleQueue::Finished);
267 }
268 
269 
jobAboutToBeRemoved(const Job & job)270 void QueueRemote::jobAboutToBeRemoved(const Job &job)
271 {
272   m_pendingSubmission.removeOne(job.moleQueueId());
273   Queue::jobAboutToBeRemoved(job);
274 }
275 
removeStaleJobs()276 void QueueRemote::removeStaleJobs()
277 {
278   if (m_server) {
279     if (JobManager *jobManager = m_server->jobManager()) {
280       QList<IdType> staleQueueIds;
281       for (QMap<IdType, IdType>::const_iterator it = m_jobs.constBegin(),
282            it_end = m_jobs.constEnd(); it != it_end; ++it) {
283         if (jobManager->lookupJobByMoleQueueId(it.value()).isValid())
284           continue;
285         staleQueueIds << it.key();
286         Logger::logError(tr("Job with MoleQueue id %1 is missing, but the Queue"
287                             " '%2' is still holding a reference to it. Please "
288                             "report this bug and check if the job needs to be "
289                             "resubmitted.").arg(it.value()).arg(name()),
290                          it.value());
291       }
292       foreach (IdType queueId, staleQueueIds)
293         m_jobs.remove(queueId);
294     }
295   }
296 }
297 
timerEvent(QTimerEvent * theEvent)298 void QueueRemote::timerEvent(QTimerEvent *theEvent)
299 {
300   if (theEvent->timerId() == m_checkQueueTimerId) {
301     theEvent->accept();
302     removeStaleJobs();
303     if (!m_jobs.isEmpty())
304       requestQueueUpdate();
305     return;
306   }
307   else if (theEvent->timerId() == m_checkForPendingJobsTimerId) {
308     theEvent->accept();
309     submitPendingJobs();
310     return;
311   }
312 
313   QObject::timerEvent(theEvent);
314 }
315 
316 } // End namespace
317