1 /******************************************************************************
2 
3   This source file is part of the MoleQueue project.
4 
5   Copyright 2012 Kitware, Inc.
6 
7   This source code is released under the New BSD License, (the "License").
8 
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14 
15 ******************************************************************************/
16 
17 #include "local.h"
18 
19 #include "../filesystemtools.h"
20 #include "../job.h"
21 #include "../jobmanager.h"
22 #include "../localqueuewidget.h"
23 #include "../logentry.h"
24 #include "../logger.h"
25 #include "../program.h"
26 #include "../queue.h"
27 #include "../queuemanager.h"
28 #include "../server.h"
29 
30 #include <qjsonarray.h>
31 #include <qjsondocument.h>
32 
33 #include <QtCore/QProcess>
34 #include <QtCore/QProcessEnvironment>
35 
36 #include <QtCore/QDir>
37 #include <QtCore/QFile>
38 #include <QtCore/QProcess>
39 #include <QtCore/QTimerEvent>
40 #include <QtCore/QThread> // For ideal thread count
41 
42 #include <QtWidgets/QFormLayout>
43 #include <QtWidgets/QSpinBox>
44 
45 #include <QtCore/QDebug>
46 
47 #ifdef _WIN32
48 #include <Windows.h> // For _PROCESS_INFORMATION (PID parsing)
49 #endif
50 
51 namespace MoleQueue {
52 
QueueLocal(QueueManager * parentManager)53 QueueLocal::QueueLocal(QueueManager *parentManager) :
54   Queue("Local", parentManager),
55   m_checkJobLimitTimerId(-1),
56   m_cores(-1)
57 {
58 #ifdef _WIN32
59   m_launchTemplate = "@echo off\n\n$$programExecution$$\n";
60   m_launchScriptName = "MoleQueueLauncher.bat";
61 #else // WIN32
62   m_launchTemplate = "#!/bin/bash\n\n$$programExecution$$\n";
63   m_launchScriptName = "MoleQueueLauncher.sh";
64 #endif // WIN32
65 
66   // Check if new jobs need starting every 100 ms
67   m_checkJobLimitTimerId = startTimer(100);
68 }
69 
~QueueLocal()70 QueueLocal::~QueueLocal()
71 {
72 }
73 
writeJsonSettings(QJsonObject & json,bool exportOnly,bool includePrograms) const74 bool QueueLocal::writeJsonSettings(QJsonObject &json, bool exportOnly,
75                                    bool includePrograms) const
76 {
77   if (!Queue::writeJsonSettings(json, exportOnly, includePrograms))
78     return false;
79 
80   json.insert("cores", static_cast<double>(m_cores));
81 
82   if (!exportOnly) {
83     QJsonArray jobsToResumeArray;
84     foreach (IdType jobId, m_runningJobs.keys())
85       jobsToResumeArray.append(idTypeToJson(jobId));
86     foreach (IdType jobId, m_pendingJobQueue)
87       jobsToResumeArray.append(idTypeToJson(jobId));
88     json.insert("jobsToResume", jobsToResumeArray);
89   }
90 
91   return true;
92 }
93 
readJsonSettings(const QJsonObject & json,bool importOnly,bool includePrograms)94 bool QueueLocal::readJsonSettings(const QJsonObject &json, bool importOnly,
95                                   bool includePrograms)
96 {
97   // Validate JSON
98   if (!json.value("cores").isDouble()) {
99     Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
100                      .arg(QString(QJsonDocument(json).toJson())));
101     return false;
102   }
103 
104   QList<IdType> jobsToResume;
105   if (!importOnly && json.contains("jobsToResume")) {
106     if (!json.value("jobsToResume").isArray()) {
107       Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
108                        .arg(QString(QJsonDocument(json).toJson())));
109       return false;
110     }
111 
112     QJsonArray jobsToResumeArray = json.value("jobsToResume").toArray();
113     foreach (QJsonValue val, jobsToResumeArray) {
114       if (!val.isDouble()) {
115         Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
116                          .arg(QString(QJsonDocument(json).toJson())));
117         return false;
118       }
119       jobsToResume.append(toIdType(val));
120     }
121   }
122 
123   if (!Queue::readJsonSettings(json, importOnly, includePrograms))
124     return false;
125 
126   // Everything is validated -- go ahead and update object.
127   m_cores = static_cast<int>(json.value("cores").toDouble() + 0.5);
128   m_pendingJobQueue = jobsToResume;
129 
130   return true;
131 }
132 
settingsWidget()133 AbstractQueueSettingsWidget *QueueLocal::settingsWidget()
134 {
135   LocalQueueWidget *widget = new LocalQueueWidget(this);
136   return widget;
137 }
138 
submitJob(Job job)139 bool QueueLocal::submitJob(Job job)
140 {
141   if (job.isValid()) {
142     Job(job).setJobState(MoleQueue::Accepted);
143     return prepareJobForSubmission(job);;
144   }
145   Logger::logError(tr("Refusing to submit job to Queue '%1': Job object is "
146                       "invalid.").arg(m_name), job.moleQueueId());
147   return false;
148 }
149 
killJob(Job job)150 void QueueLocal::killJob(Job job)
151 {
152   if (!job.isValid())
153     return;
154 
155   int pendingIndex = m_pendingJobQueue.indexOf(job.moleQueueId());
156   if (pendingIndex >= 0) {
157     m_pendingJobQueue.removeAt(pendingIndex);
158     job.setJobState(MoleQueue::Canceled);
159     return;
160   }
161 
162   QProcess *process = m_runningJobs.take(job.moleQueueId());
163   if (process != NULL) {
164     m_jobs.remove(job.queueId());
165     process->disconnect(this);
166     process->terminate();
167     process->deleteLater();
168     job.setJobState(MoleQueue::Canceled);
169     return;
170   }
171 
172   job.setJobState(MoleQueue::Canceled);
173 }
174 
prepareJobForSubmission(Job & job)175 bool QueueLocal::prepareJobForSubmission(Job &job)
176 {
177   if (!writeInputFiles(job)) {
178     Logger::logError(tr("Error while writing input files."), job.moleQueueId());
179     job.setJobState(Error);
180     return false;
181   }
182   if (!addJobToQueue(job))
183     return false;
184 
185   return true;
186 }
187 
processStarted()188 void QueueLocal::processStarted()
189 {
190   QProcess *process = qobject_cast<QProcess*>(sender());
191   if (!process)
192     return;
193 
194   IdType moleQueueId = m_runningJobs.key(process, 0);
195   if (moleQueueId == 0)
196     return;
197 
198   IdType queueId;
199 #ifdef _WIN32
200   queueId = static_cast<IdType>(process->pid()->dwProcessId);
201 #else // WIN32
202   queueId = static_cast<IdType>(process->pid());
203 #endif // WIN32
204 
205   // Get pointer to jobmanager to lookup job
206   if (!m_server) {
207     Logger::logError(tr("Queue '%1' cannot locate Server instance!")
208                      .arg(m_name), moleQueueId);
209     return;
210   }
211   Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
212   if (!job.isValid()) {
213     Logger::logError(tr("Queue '%1' Cannot update invalid Job reference!")
214                      .arg(m_name), moleQueueId);
215     return;
216   }
217   job.setQueueId(queueId);
218   job.setJobState(MoleQueue::RunningLocal);
219 }
220 
processFinished(int exitCode,QProcess::ExitStatus exitStatus)221 void QueueLocal::processFinished(int exitCode, QProcess::ExitStatus exitStatus)
222 {
223   Q_UNUSED(exitCode);
224   Q_UNUSED(exitStatus);
225 
226   QProcess *process = qobject_cast<QProcess*>(sender());
227   if (!process)
228     return;
229 
230   IdType moleQueueId = m_runningJobs.key(process, 0);
231   if (moleQueueId == 0)
232     return;
233 
234   // Remove and delete QProcess from queue
235   m_runningJobs.take(moleQueueId)->deleteLater();
236 
237   // Get pointer to jobmanager to lookup job
238   if (!m_server) {
239     Logger::logError(tr("Queue '%1' cannot locate Server instance!")
240                      .arg(m_name), moleQueueId);
241     return;
242   }
243   Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
244   if (!job.isValid()) {
245     Logger::logDebugMessage(tr("Queue '%1' Cannot update invalid Job "
246                                "reference!").arg(m_name), moleQueueId);
247     return;
248   }
249 
250   if (!job.outputDirectory().isEmpty() &&
251       job.outputDirectory() != job.localWorkingDirectory()) {
252     // copy function logs errors if needed
253     if (!FileSystemTools::recursiveCopyDirectory(job.localWorkingDirectory(),
254                                                  job.outputDirectory())) {
255       Logger::logError(tr("Cannot copy '%1' -> '%2'.")
256                        .arg(job.localWorkingDirectory(),
257                             job.outputDirectory()), job.moleQueueId());
258       job.setJobState(MoleQueue::Error);
259       return;
260     }
261   }
262 
263   if (job.cleanLocalWorkingDirectory())
264     cleanLocalDirectory(job);
265 
266   job.setJobState(MoleQueue::Finished);
267 }
268 
maxNumberOfCores() const269 int QueueLocal::maxNumberOfCores() const
270 {
271   if (m_cores > 0)
272     return m_cores;
273   else
274     return QThread::idealThreadCount();
275 }
276 
277 
addJobToQueue(const Job & job)278 bool QueueLocal::addJobToQueue(const Job &job)
279 {
280   m_pendingJobQueue.append(job.moleQueueId());
281 
282   Job(job).setJobState(MoleQueue::QueuedLocal);
283 
284   return true;
285 }
286 
connectProcess(QProcess * proc)287 void QueueLocal::connectProcess(QProcess *proc)
288 {
289   connect(proc, SIGNAL(started()),
290           this, SLOT(processStarted()));
291   connect(proc, SIGNAL(finished(int,QProcess::ExitStatus)),
292           this, SLOT(processFinished(int,QProcess::ExitStatus)));
293   connect(proc, SIGNAL(error(QProcess::ProcessError)),
294           this, SLOT(processError(QProcess::ProcessError)));
295 }
296 
checkJobQueue()297 void QueueLocal::checkJobQueue()
298 {
299   if (m_pendingJobQueue.isEmpty())
300     return;
301 
302   int coresInUse = 0;
303   foreach(IdType moleQueueId, m_runningJobs.keys()) {
304     const Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
305     if (job.isValid())
306       coresInUse += job.numberOfCores();
307   }
308 
309   int totalCores = maxNumberOfCores();
310   int coresAvailable = totalCores - coresInUse;
311 
312   // Keep submitting jobs (FIFO) until we hit one we can't afford to start.
313   while (!m_pendingJobQueue.isEmpty() && coresAvailable > 0) {
314     IdType nextMQId = m_pendingJobQueue.first();
315     Job nextJob = m_server->jobManager()->lookupJobByMoleQueueId(nextMQId);
316     if (!nextJob.isValid()) {
317       m_pendingJobQueue.removeFirst();
318       continue;
319     }
320     else if (nextJob.numberOfCores() <= coresAvailable) {
321       m_pendingJobQueue.removeFirst();
322       if (startJob(nextJob.moleQueueId()))
323         coresAvailable -= nextJob.numberOfCores();
324       continue;
325     }
326 
327     // Cannot start next job yet!
328     break;
329   }
330 }
331 
startJob(IdType moleQueueId)332 bool QueueLocal::startJob(IdType moleQueueId)
333 {
334   // Get pointers to job, server, etc
335   if (!m_server) {
336     Logger::logError(tr("Queue '%1' cannot locate Server instance!")
337                      .arg(m_name), moleQueueId);
338     return false;
339   }
340   const Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
341   if (!job.isValid()) {
342     Logger::logError(tr("Queue '%1' cannot locate Job with MoleQueue id %2.")
343                      .arg(m_name).arg(idTypeToString(moleQueueId)),
344                      moleQueueId);
345     return false;
346   }
347   const Program *program = lookupProgram(job.program());
348   if (!program) {
349     Logger::logError(tr("Queue '%1' cannot locate Program '%2'.")
350                      .arg(m_name).arg(job.program()), moleQueueId);
351     return false;
352   }
353 
354   FileSpecification inputFileSpec(job.inputFile());
355 
356   // Create and setup process
357   QProcess *proc = new QProcess (this);
358   QDir dir (job.localWorkingDirectory());
359   proc->setWorkingDirectory(dir.absolutePath());
360 
361   QStringList arguments;
362   if (!program->arguments().isEmpty())
363     arguments << program->arguments();
364 
365   QString command;
366 
367   // Set default command. May be overwritten later.
368   command = program->executable();
369 
370   switch (program->launchSyntax()) {
371   case Program::CUSTOM:
372 #ifdef _WIN32
373     command = "cmd.exe /c " + launchScriptName();
374 #else // WIN32
375     command = "./" + launchScriptName();
376 #endif // WIN32
377     break;
378   case Program::PLAIN:
379     break;
380   case Program::INPUT_ARG:
381     arguments << inputFileSpec.filename();
382     break;
383   case Program::INPUT_ARG_NO_EXT:
384     arguments << inputFileSpec.fileBaseName();
385     break;
386   case Program::REDIRECT:
387   {
388     proc->setStandardInputFile(dir.absoluteFilePath(inputFileSpec.filename()));
389     QString outputFilename(program->outputFilename());
390     replaceKeywords(outputFilename, job, false);
391     proc->setStandardOutputFile(dir.absoluteFilePath(outputFilename));
392   }
393     break;
394   case Program::INPUT_ARG_OUTPUT_REDIRECT:
395   {
396     arguments << inputFileSpec.filename();
397     QString outputFilename(program->outputFilename());
398     replaceKeywords(outputFilename, job, false);
399     proc->setStandardOutputFile(dir.absoluteFilePath(outputFilename));
400   }
401     break;
402   case Program::SYNTAX_COUNT:
403   default:
404     Logger::logError(tr("Unknown launcher syntax for program %1: %2.")
405                      .arg(job.program()).arg(program->launchSyntax()),
406                      moleQueueId);
407     return false;
408   }
409 
410   connectProcess(proc);
411 
412   // Handle any keywords in the arguments
413   QString args = arguments.join(" ");
414   replaceKeywords(args, job, false);
415 
416   Logger::logNotification(tr("Executing '%1 %2' in %3", "command, args, dir")
417                           .arg(command).arg(args)
418                           .arg(proc->workingDirectory()),
419                           job.moleQueueId());
420   m_runningJobs.insert(job.moleQueueId(), proc);
421   proc->start(command + " " + args);
422 
423   return true;
424 }
425 
timerEvent(QTimerEvent * theEvent)426 void QueueLocal::timerEvent(QTimerEvent *theEvent)
427 {
428   if (theEvent->timerId() == m_checkJobLimitTimerId) {
429     checkJobQueue();
430     theEvent->accept();
431     return;
432   }
433 
434   QObject::timerEvent(theEvent);
435 }
436 
processError(QProcess::ProcessError error)437 void QueueLocal::processError(QProcess::ProcessError error)
438 {
439   QProcess *process = qobject_cast<QProcess*>(sender());
440   if (!process)
441     return;
442 
443   IdType moleQueueId = m_runningJobs.key(process, 0);
444   if (moleQueueId == 0)
445     return;
446 
447   // Remove and delete QProcess from queue
448   m_runningJobs.take(moleQueueId)->deleteLater();
449 
450   if (!m_server) {
451     Logger::logError(tr("Queue '%1' cannot locate Server instance!")
452                      .arg(m_name), moleQueueId);
453     return;
454   }
455 
456   Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
457   if (!job.isValid()) {
458     Logger::logDebugMessage(tr("Queue '%1' Cannot update invalid Job "
459                                "reference!").arg(m_name), moleQueueId);
460     return;
461   }
462 
463   QString errorString = QueueLocal::processErrorToString(error);
464   Logger::logError(tr("Execution of \'%1\' failed with process \'%2\': %3")
465                       .arg(job.program()).arg(errorString)
466                       .arg(process->errorString()), moleQueueId);
467 
468   job.setJobState(MoleQueue::Error);
469 }
470 
471 
472 /**
473  * Convert a ProcessError value to a string.
474  *
475  * @param error ProcessError
476  * @return C string
477  */
processErrorToString(QProcess::ProcessError error)478 QString QueueLocal::processErrorToString(QProcess::ProcessError error)
479 {
480   switch(error)
481   {
482   case QProcess::FailedToStart:
483     return tr("Failed to start");
484   case QProcess::Crashed:
485     return tr("Crashed");
486   case QProcess::Timedout:
487     return tr("Timed out");
488   case QProcess::WriteError:
489     return tr("Write error");
490   case QProcess::ReadError:
491     return tr("Read error");
492   case QProcess::UnknownError:
493     return tr("Unknown error");
494   }
495 
496   Logger::logError(tr("Unrecognized Process Error: %1").arg(error));
497 
498   return tr("Unrecognized process error");
499 }
500 
501 } // End namespace
502