1 /******************************************************************************
2
3 This source file is part of the MoleQueue project.
4
5 Copyright 2012 Kitware, Inc.
6
7 This source code is released under the New BSD License, (the "License").
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14
15 ******************************************************************************/
16
17 #include "local.h"
18
19 #include "../filesystemtools.h"
20 #include "../job.h"
21 #include "../jobmanager.h"
22 #include "../localqueuewidget.h"
23 #include "../logentry.h"
24 #include "../logger.h"
25 #include "../program.h"
26 #include "../queue.h"
27 #include "../queuemanager.h"
28 #include "../server.h"
29
30 #include <qjsonarray.h>
31 #include <qjsondocument.h>
32
33 #include <QtCore/QProcess>
34 #include <QtCore/QProcessEnvironment>
35
36 #include <QtCore/QDir>
37 #include <QtCore/QFile>
38 #include <QtCore/QProcess>
39 #include <QtCore/QTimerEvent>
40 #include <QtCore/QThread> // For ideal thread count
41
42 #include <QtWidgets/QFormLayout>
43 #include <QtWidgets/QSpinBox>
44
45 #include <QtCore/QDebug>
46
47 #ifdef _WIN32
48 #include <Windows.h> // For _PROCESS_INFORMATION (PID parsing)
49 #endif
50
51 namespace MoleQueue {
52
QueueLocal(QueueManager * parentManager)53 QueueLocal::QueueLocal(QueueManager *parentManager) :
54 Queue("Local", parentManager),
55 m_checkJobLimitTimerId(-1),
56 m_cores(-1)
57 {
58 #ifdef _WIN32
59 m_launchTemplate = "@echo off\n\n$$programExecution$$\n";
60 m_launchScriptName = "MoleQueueLauncher.bat";
61 #else // WIN32
62 m_launchTemplate = "#!/bin/bash\n\n$$programExecution$$\n";
63 m_launchScriptName = "MoleQueueLauncher.sh";
64 #endif // WIN32
65
66 // Check if new jobs need starting every 100 ms
67 m_checkJobLimitTimerId = startTimer(100);
68 }
69
~QueueLocal()70 QueueLocal::~QueueLocal()
71 {
72 }
73
writeJsonSettings(QJsonObject & json,bool exportOnly,bool includePrograms) const74 bool QueueLocal::writeJsonSettings(QJsonObject &json, bool exportOnly,
75 bool includePrograms) const
76 {
77 if (!Queue::writeJsonSettings(json, exportOnly, includePrograms))
78 return false;
79
80 json.insert("cores", static_cast<double>(m_cores));
81
82 if (!exportOnly) {
83 QJsonArray jobsToResumeArray;
84 foreach (IdType jobId, m_runningJobs.keys())
85 jobsToResumeArray.append(idTypeToJson(jobId));
86 foreach (IdType jobId, m_pendingJobQueue)
87 jobsToResumeArray.append(idTypeToJson(jobId));
88 json.insert("jobsToResume", jobsToResumeArray);
89 }
90
91 return true;
92 }
93
readJsonSettings(const QJsonObject & json,bool importOnly,bool includePrograms)94 bool QueueLocal::readJsonSettings(const QJsonObject &json, bool importOnly,
95 bool includePrograms)
96 {
97 // Validate JSON
98 if (!json.value("cores").isDouble()) {
99 Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
100 .arg(QString(QJsonDocument(json).toJson())));
101 return false;
102 }
103
104 QList<IdType> jobsToResume;
105 if (!importOnly && json.contains("jobsToResume")) {
106 if (!json.value("jobsToResume").isArray()) {
107 Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
108 .arg(QString(QJsonDocument(json).toJson())));
109 return false;
110 }
111
112 QJsonArray jobsToResumeArray = json.value("jobsToResume").toArray();
113 foreach (QJsonValue val, jobsToResumeArray) {
114 if (!val.isDouble()) {
115 Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
116 .arg(QString(QJsonDocument(json).toJson())));
117 return false;
118 }
119 jobsToResume.append(toIdType(val));
120 }
121 }
122
123 if (!Queue::readJsonSettings(json, importOnly, includePrograms))
124 return false;
125
126 // Everything is validated -- go ahead and update object.
127 m_cores = static_cast<int>(json.value("cores").toDouble() + 0.5);
128 m_pendingJobQueue = jobsToResume;
129
130 return true;
131 }
132
settingsWidget()133 AbstractQueueSettingsWidget *QueueLocal::settingsWidget()
134 {
135 LocalQueueWidget *widget = new LocalQueueWidget(this);
136 return widget;
137 }
138
submitJob(Job job)139 bool QueueLocal::submitJob(Job job)
140 {
141 if (job.isValid()) {
142 Job(job).setJobState(MoleQueue::Accepted);
143 return prepareJobForSubmission(job);;
144 }
145 Logger::logError(tr("Refusing to submit job to Queue '%1': Job object is "
146 "invalid.").arg(m_name), job.moleQueueId());
147 return false;
148 }
149
killJob(Job job)150 void QueueLocal::killJob(Job job)
151 {
152 if (!job.isValid())
153 return;
154
155 int pendingIndex = m_pendingJobQueue.indexOf(job.moleQueueId());
156 if (pendingIndex >= 0) {
157 m_pendingJobQueue.removeAt(pendingIndex);
158 job.setJobState(MoleQueue::Canceled);
159 return;
160 }
161
162 QProcess *process = m_runningJobs.take(job.moleQueueId());
163 if (process != NULL) {
164 m_jobs.remove(job.queueId());
165 process->disconnect(this);
166 process->terminate();
167 process->deleteLater();
168 job.setJobState(MoleQueue::Canceled);
169 return;
170 }
171
172 job.setJobState(MoleQueue::Canceled);
173 }
174
prepareJobForSubmission(Job & job)175 bool QueueLocal::prepareJobForSubmission(Job &job)
176 {
177 if (!writeInputFiles(job)) {
178 Logger::logError(tr("Error while writing input files."), job.moleQueueId());
179 job.setJobState(Error);
180 return false;
181 }
182 if (!addJobToQueue(job))
183 return false;
184
185 return true;
186 }
187
processStarted()188 void QueueLocal::processStarted()
189 {
190 QProcess *process = qobject_cast<QProcess*>(sender());
191 if (!process)
192 return;
193
194 IdType moleQueueId = m_runningJobs.key(process, 0);
195 if (moleQueueId == 0)
196 return;
197
198 IdType queueId;
199 #ifdef _WIN32
200 queueId = static_cast<IdType>(process->pid()->dwProcessId);
201 #else // WIN32
202 queueId = static_cast<IdType>(process->pid());
203 #endif // WIN32
204
205 // Get pointer to jobmanager to lookup job
206 if (!m_server) {
207 Logger::logError(tr("Queue '%1' cannot locate Server instance!")
208 .arg(m_name), moleQueueId);
209 return;
210 }
211 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
212 if (!job.isValid()) {
213 Logger::logError(tr("Queue '%1' Cannot update invalid Job reference!")
214 .arg(m_name), moleQueueId);
215 return;
216 }
217 job.setQueueId(queueId);
218 job.setJobState(MoleQueue::RunningLocal);
219 }
220
processFinished(int exitCode,QProcess::ExitStatus exitStatus)221 void QueueLocal::processFinished(int exitCode, QProcess::ExitStatus exitStatus)
222 {
223 Q_UNUSED(exitCode);
224 Q_UNUSED(exitStatus);
225
226 QProcess *process = qobject_cast<QProcess*>(sender());
227 if (!process)
228 return;
229
230 IdType moleQueueId = m_runningJobs.key(process, 0);
231 if (moleQueueId == 0)
232 return;
233
234 // Remove and delete QProcess from queue
235 m_runningJobs.take(moleQueueId)->deleteLater();
236
237 // Get pointer to jobmanager to lookup job
238 if (!m_server) {
239 Logger::logError(tr("Queue '%1' cannot locate Server instance!")
240 .arg(m_name), moleQueueId);
241 return;
242 }
243 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
244 if (!job.isValid()) {
245 Logger::logDebugMessage(tr("Queue '%1' Cannot update invalid Job "
246 "reference!").arg(m_name), moleQueueId);
247 return;
248 }
249
250 if (!job.outputDirectory().isEmpty() &&
251 job.outputDirectory() != job.localWorkingDirectory()) {
252 // copy function logs errors if needed
253 if (!FileSystemTools::recursiveCopyDirectory(job.localWorkingDirectory(),
254 job.outputDirectory())) {
255 Logger::logError(tr("Cannot copy '%1' -> '%2'.")
256 .arg(job.localWorkingDirectory(),
257 job.outputDirectory()), job.moleQueueId());
258 job.setJobState(MoleQueue::Error);
259 return;
260 }
261 }
262
263 if (job.cleanLocalWorkingDirectory())
264 cleanLocalDirectory(job);
265
266 job.setJobState(MoleQueue::Finished);
267 }
268
maxNumberOfCores() const269 int QueueLocal::maxNumberOfCores() const
270 {
271 if (m_cores > 0)
272 return m_cores;
273 else
274 return QThread::idealThreadCount();
275 }
276
277
addJobToQueue(const Job & job)278 bool QueueLocal::addJobToQueue(const Job &job)
279 {
280 m_pendingJobQueue.append(job.moleQueueId());
281
282 Job(job).setJobState(MoleQueue::QueuedLocal);
283
284 return true;
285 }
286
connectProcess(QProcess * proc)287 void QueueLocal::connectProcess(QProcess *proc)
288 {
289 connect(proc, SIGNAL(started()),
290 this, SLOT(processStarted()));
291 connect(proc, SIGNAL(finished(int,QProcess::ExitStatus)),
292 this, SLOT(processFinished(int,QProcess::ExitStatus)));
293 connect(proc, SIGNAL(error(QProcess::ProcessError)),
294 this, SLOT(processError(QProcess::ProcessError)));
295 }
296
checkJobQueue()297 void QueueLocal::checkJobQueue()
298 {
299 if (m_pendingJobQueue.isEmpty())
300 return;
301
302 int coresInUse = 0;
303 foreach(IdType moleQueueId, m_runningJobs.keys()) {
304 const Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
305 if (job.isValid())
306 coresInUse += job.numberOfCores();
307 }
308
309 int totalCores = maxNumberOfCores();
310 int coresAvailable = totalCores - coresInUse;
311
312 // Keep submitting jobs (FIFO) until we hit one we can't afford to start.
313 while (!m_pendingJobQueue.isEmpty() && coresAvailable > 0) {
314 IdType nextMQId = m_pendingJobQueue.first();
315 Job nextJob = m_server->jobManager()->lookupJobByMoleQueueId(nextMQId);
316 if (!nextJob.isValid()) {
317 m_pendingJobQueue.removeFirst();
318 continue;
319 }
320 else if (nextJob.numberOfCores() <= coresAvailable) {
321 m_pendingJobQueue.removeFirst();
322 if (startJob(nextJob.moleQueueId()))
323 coresAvailable -= nextJob.numberOfCores();
324 continue;
325 }
326
327 // Cannot start next job yet!
328 break;
329 }
330 }
331
startJob(IdType moleQueueId)332 bool QueueLocal::startJob(IdType moleQueueId)
333 {
334 // Get pointers to job, server, etc
335 if (!m_server) {
336 Logger::logError(tr("Queue '%1' cannot locate Server instance!")
337 .arg(m_name), moleQueueId);
338 return false;
339 }
340 const Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
341 if (!job.isValid()) {
342 Logger::logError(tr("Queue '%1' cannot locate Job with MoleQueue id %2.")
343 .arg(m_name).arg(idTypeToString(moleQueueId)),
344 moleQueueId);
345 return false;
346 }
347 const Program *program = lookupProgram(job.program());
348 if (!program) {
349 Logger::logError(tr("Queue '%1' cannot locate Program '%2'.")
350 .arg(m_name).arg(job.program()), moleQueueId);
351 return false;
352 }
353
354 FileSpecification inputFileSpec(job.inputFile());
355
356 // Create and setup process
357 QProcess *proc = new QProcess (this);
358 QDir dir (job.localWorkingDirectory());
359 proc->setWorkingDirectory(dir.absolutePath());
360
361 QStringList arguments;
362 if (!program->arguments().isEmpty())
363 arguments << program->arguments();
364
365 QString command;
366
367 // Set default command. May be overwritten later.
368 command = program->executable();
369
370 switch (program->launchSyntax()) {
371 case Program::CUSTOM:
372 #ifdef _WIN32
373 command = "cmd.exe /c " + launchScriptName();
374 #else // WIN32
375 command = "./" + launchScriptName();
376 #endif // WIN32
377 break;
378 case Program::PLAIN:
379 break;
380 case Program::INPUT_ARG:
381 arguments << inputFileSpec.filename();
382 break;
383 case Program::INPUT_ARG_NO_EXT:
384 arguments << inputFileSpec.fileBaseName();
385 break;
386 case Program::REDIRECT:
387 {
388 proc->setStandardInputFile(dir.absoluteFilePath(inputFileSpec.filename()));
389 QString outputFilename(program->outputFilename());
390 replaceKeywords(outputFilename, job, false);
391 proc->setStandardOutputFile(dir.absoluteFilePath(outputFilename));
392 }
393 break;
394 case Program::INPUT_ARG_OUTPUT_REDIRECT:
395 {
396 arguments << inputFileSpec.filename();
397 QString outputFilename(program->outputFilename());
398 replaceKeywords(outputFilename, job, false);
399 proc->setStandardOutputFile(dir.absoluteFilePath(outputFilename));
400 }
401 break;
402 case Program::SYNTAX_COUNT:
403 default:
404 Logger::logError(tr("Unknown launcher syntax for program %1: %2.")
405 .arg(job.program()).arg(program->launchSyntax()),
406 moleQueueId);
407 return false;
408 }
409
410 connectProcess(proc);
411
412 // Handle any keywords in the arguments
413 QString args = arguments.join(" ");
414 replaceKeywords(args, job, false);
415
416 Logger::logNotification(tr("Executing '%1 %2' in %3", "command, args, dir")
417 .arg(command).arg(args)
418 .arg(proc->workingDirectory()),
419 job.moleQueueId());
420 m_runningJobs.insert(job.moleQueueId(), proc);
421 proc->start(command + " " + args);
422
423 return true;
424 }
425
timerEvent(QTimerEvent * theEvent)426 void QueueLocal::timerEvent(QTimerEvent *theEvent)
427 {
428 if (theEvent->timerId() == m_checkJobLimitTimerId) {
429 checkJobQueue();
430 theEvent->accept();
431 return;
432 }
433
434 QObject::timerEvent(theEvent);
435 }
436
processError(QProcess::ProcessError error)437 void QueueLocal::processError(QProcess::ProcessError error)
438 {
439 QProcess *process = qobject_cast<QProcess*>(sender());
440 if (!process)
441 return;
442
443 IdType moleQueueId = m_runningJobs.key(process, 0);
444 if (moleQueueId == 0)
445 return;
446
447 // Remove and delete QProcess from queue
448 m_runningJobs.take(moleQueueId)->deleteLater();
449
450 if (!m_server) {
451 Logger::logError(tr("Queue '%1' cannot locate Server instance!")
452 .arg(m_name), moleQueueId);
453 return;
454 }
455
456 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
457 if (!job.isValid()) {
458 Logger::logDebugMessage(tr("Queue '%1' Cannot update invalid Job "
459 "reference!").arg(m_name), moleQueueId);
460 return;
461 }
462
463 QString errorString = QueueLocal::processErrorToString(error);
464 Logger::logError(tr("Execution of \'%1\' failed with process \'%2\': %3")
465 .arg(job.program()).arg(errorString)
466 .arg(process->errorString()), moleQueueId);
467
468 job.setJobState(MoleQueue::Error);
469 }
470
471
472 /**
473 * Convert a ProcessError value to a string.
474 *
475 * @param error ProcessError
476 * @return C string
477 */
processErrorToString(QProcess::ProcessError error)478 QString QueueLocal::processErrorToString(QProcess::ProcessError error)
479 {
480 switch(error)
481 {
482 case QProcess::FailedToStart:
483 return tr("Failed to start");
484 case QProcess::Crashed:
485 return tr("Crashed");
486 case QProcess::Timedout:
487 return tr("Timed out");
488 case QProcess::WriteError:
489 return tr("Write error");
490 case QProcess::ReadError:
491 return tr("Read error");
492 case QProcess::UnknownError:
493 return tr("Unknown error");
494 }
495
496 Logger::logError(tr("Unrecognized Process Error: %1").arg(error));
497
498 return tr("Unrecognized process error");
499 }
500
501 } // End namespace
502