1 /******************************************************************************
2
3 This source file is part of the MoleQueue project.
4
5 Copyright 2011-2012 Kitware, Inc.
6
7 This source code is released under the New BSD License, (the "License").
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14
15 ******************************************************************************/
16
17 #include "remotessh.h"
18
19 #include "../filesystemtools.h"
20 #include "../job.h"
21 #include "../jobmanager.h"
22 #include "../logentry.h"
23 #include "../logger.h"
24 #include "../program.h"
25 #include "../remotequeuewidget.h"
26 #include "../server.h"
27 #include "../sshcommandfactory.h"
28
29 #include <qjsondocument.h>
30
31 #include <QtCore/QTimer>
32 #include <QtCore/QDebug>
33
34 #include <QtGui>
35
36 namespace MoleQueue {
37
QueueRemoteSsh(const QString & queueName,QueueManager * parentObject)38 QueueRemoteSsh::QueueRemoteSsh(const QString &queueName, QueueManager *parentObject)
39 : QueueRemote(queueName, parentObject),
40 m_sshExecutable(SshCommandFactory::defaultSshCommand()),
41 m_scpExecutable(SshCommandFactory::defaultScpCommand()),
42 m_sshPort(22),
43 m_isCheckingQueue(false)
44 {
45 // Check for jobs to submit every 5 seconds
46 m_checkForPendingJobsTimerId = startTimer(5000);
47
48 // Always allow m_requestQueueCommand to return 0
49 m_allowedQueueRequestExitCodes.append(0);
50 }
51
~QueueRemoteSsh()52 QueueRemoteSsh::~QueueRemoteSsh()
53 {
54 }
55
writeJsonSettings(QJsonObject & json,bool exportOnly,bool includePrograms) const56 bool QueueRemoteSsh::writeJsonSettings(QJsonObject &json, bool exportOnly,
57 bool includePrograms) const
58 {
59 if (!QueueRemote::writeJsonSettings(json, exportOnly, includePrograms))
60 return false;
61
62 json.insert("submissionCommand", m_submissionCommand);
63 json.insert("requestQueueCommand", m_requestQueueCommand);
64 json.insert("killCommand", m_killCommand);
65 json.insert("hostName", m_hostName);
66 json.insert("sshPort", static_cast<double>(m_sshPort));
67
68 if (!exportOnly) {
69 json.insert("sshExecutable", m_sshExecutable);
70 json.insert("scpExecutable", m_scpExecutable);
71 json.insert("userName", m_userName);
72 json.insert("identityFile", m_identityFile);
73 }
74
75 return true;
76 }
77
readJsonSettings(const QJsonObject & json,bool importOnly,bool includePrograms)78 bool QueueRemoteSsh::readJsonSettings(const QJsonObject &json, bool importOnly,
79 bool includePrograms)
80 {
81 // Validate JSON
82 if (!json.value("submissionCommand").isString() ||
83 !json.value("requestQueueCommand").isString() ||
84 !json.value("killCommand").isString() ||
85 !json.value("hostName").isString() ||
86 !json.value("sshPort").isDouble() ||
87 (!importOnly && (
88 !json.value("sshExecutable").isString() ||
89 !json.value("scpExecutable").isString() ||
90 !json.value("userName").isString() ||
91 !json.value("identityFile").isString()))) {
92 Logger::logError(tr("Error reading queue settings: Invalid format:\n%1")
93 .arg(QString(QJsonDocument(json).toJson())));
94 return false;
95 }
96
97 if (!QueueRemote::readJsonSettings(json, importOnly, includePrograms))
98 return false;
99
100 m_submissionCommand = json.value("submissionCommand").toString();
101 m_requestQueueCommand = json.value("requestQueueCommand").toString();
102 m_killCommand = json.value("killCommand").toString();
103 m_hostName = json.value("hostName").toString();
104 m_sshPort = static_cast<int>(json.value("sshPort").toDouble() + 0.5);
105
106 if (!importOnly) {
107 m_sshExecutable = json.value("sshExecutable").toString();
108 m_scpExecutable = json.value("scpExecutable").toString();
109 m_userName = json.value("userName").toString();
110 m_identityFile = json.value("identityFile").toString();
111 }
112
113 return true;
114 }
115
settingsWidget()116 AbstractQueueSettingsWidget* QueueRemoteSsh::settingsWidget()
117 {
118 RemoteQueueWidget *widget = new RemoteQueueWidget (this);
119 return widget;
120 }
121
createRemoteDirectory(Job job)122 void QueueRemoteSsh::createRemoteDirectory(Job job)
123 {
124 // Note that this is just the working directory base -- the job folder is
125 // created by scp.
126 QString remoteDir = QString("%1").arg(m_workingDirectoryBase);
127
128 SshConnection *conn = newSshConnection();
129 conn->setData(QVariant::fromValue(job));
130 connect(conn, SIGNAL(requestComplete()), this, SLOT(remoteDirectoryCreated()));
131
132 if (!conn->execute(QString("mkdir -p %1").arg(remoteDir))) {
133 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
134 " '%2' port = '%3'")
135 .arg(conn->userName()).arg(conn->hostName())
136 .arg(conn->portNumber()), job.moleQueueId());
137 job.setJobState(MoleQueue::Error);
138 conn->deleteLater();
139 return;
140 }
141 }
142
remoteDirectoryCreated()143 void QueueRemoteSsh::remoteDirectoryCreated()
144 {
145 SshConnection *conn = qobject_cast<SshConnection*>(sender());
146 if (!conn) {
147 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
148 .arg("Sender is not an SshConnection!"));
149 return;
150 }
151 conn->deleteLater();
152
153 Job job = conn->data().value<Job>();
154
155 if (!job.isValid()) {
156 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
157 .arg("Sender does not have an associated job!"));
158 return;
159 }
160
161 if (conn->exitCode() != 0) {
162 Logger::logWarning(tr("Cannot create remote directory '%1@%2:%3'.\n"
163 "Exit code (%4) %5")
164 .arg(conn->userName()).arg(conn->hostName())
165 .arg(m_workingDirectoryBase).arg(conn->exitCode())
166 .arg(conn->output()), job.moleQueueId());
167 // Retry submission:
168 if (addJobFailure(job.moleQueueId()))
169 m_pendingSubmission.append(job.moleQueueId());
170 job.setJobState(MoleQueue::Error);
171 return;
172 }
173
174 copyInputFilesToHost(job);
175 }
176
copyInputFilesToHost(Job job)177 void QueueRemoteSsh::copyInputFilesToHost(Job job)
178 {
179 QString localDir = job.localWorkingDirectory();
180 QString remoteDir = QDir::cleanPath(QString("%1/%2")
181 .arg(m_workingDirectoryBase)
182 .arg(idTypeToString(job.moleQueueId())));
183
184 SshConnection *conn = newSshConnection();
185 conn->setData(QVariant::fromValue(job));
186 connect(conn, SIGNAL(requestComplete()), this, SLOT(inputFilesCopied()));
187
188 if (!conn->copyDirTo(localDir, remoteDir)) {
189 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
190 " '%2' port = '%3'")
191 .arg(conn->userName()).arg(conn->hostName())
192 .arg(conn->portNumber()), job.moleQueueId());
193 job.setJobState(MoleQueue::Error);
194 conn->deleteLater();
195 return;
196 }
197 }
198
inputFilesCopied()199 void QueueRemoteSsh::inputFilesCopied()
200 {
201 SshConnection *conn = qobject_cast<SshConnection*>(sender());
202 if (!conn) {
203 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
204 .arg("Sender is not an SshConnection!"));
205 return;
206 }
207 conn->deleteLater();
208
209 Job job = conn->data().value<Job>();
210
211 if (!job.isValid()) {
212 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
213 .arg("Sender does not have an associated job!"));
214 return;
215 }
216
217 if (conn->exitCode() != 0) {
218 // Check if we just need to make the parent directory
219 if (conn->exitCode() == 1 &&
220 conn->output().contains("No such file or directory")) {
221 Logger::logDebugMessage(tr("Remote working directory missing on remote "
222 "host. Creating now..."), job.moleQueueId());
223 createRemoteDirectory(job);
224 return;
225 }
226 Logger::logWarning(tr("Error while copying input files to remote host:\n"
227 "'%1' --> '%2/'\nExit code (%3) %4")
228 .arg(job.localWorkingDirectory())
229 .arg(m_workingDirectoryBase)
230 .arg(conn->exitCode()).arg(conn->output()),
231 job.moleQueueId());
232 // Retry submission:
233 if (addJobFailure(job.moleQueueId()))
234 m_pendingSubmission.append(job.moleQueueId());
235 job.setJobState(MoleQueue::Error);
236 return;
237 }
238
239 submitJobToRemoteQueue(job);
240 }
241
submitJobToRemoteQueue(Job job)242 void QueueRemoteSsh::submitJobToRemoteQueue(Job job)
243 {
244 const QString command = QString("cd %1/%2 && %3 %4")
245 .arg(m_workingDirectoryBase)
246 .arg(idTypeToString(job.moleQueueId()))
247 .arg(m_submissionCommand)
248 .arg(m_launchScriptName);
249
250 SshConnection *conn = newSshConnection();
251 conn->setData(QVariant::fromValue(job));
252 connect(conn, SIGNAL(requestComplete()),
253 this, SLOT(jobSubmittedToRemoteQueue()));
254
255 if (!conn->execute(command)) {
256 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
257 " '%2' port = '%3'")
258 .arg(conn->userName()).arg(conn->hostName())
259 .arg(conn->portNumber()), job.moleQueueId());
260 job.setJobState(MoleQueue::Error);
261 conn->deleteLater();
262 return;
263 }
264 }
265
jobSubmittedToRemoteQueue()266 void QueueRemoteSsh::jobSubmittedToRemoteQueue()
267 {
268 SshConnection *conn = qobject_cast<SshConnection*>(sender());
269 if (!conn) {
270 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
271 .arg("Sender is not an SshConnection!"));
272 return;
273 }
274 conn->deleteLater();
275
276 IdType queueId(0);
277 parseQueueId(conn->output(), &queueId);
278 Job job = conn->data().value<Job>();
279
280 if (!job.isValid()) {
281 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
282 .arg("Sender does not have an associated job!"));
283 return;
284 }
285
286 if (conn->exitCode() != 0) {
287 Logger::logWarning(tr("Could not submit job to remote queue on %1@%2:%3\n"
288 "%4 %5/%6/%7\nExit code (%8) %9")
289 .arg(conn->userName()).arg(conn->hostName())
290 .arg(conn->portNumber()).arg(m_submissionCommand)
291 .arg(m_workingDirectoryBase)
292 .arg(idTypeToString(job.moleQueueId()))
293 .arg(m_launchScriptName).arg(conn->exitCode())
294 .arg(conn->output()), job.moleQueueId());
295 // Retry submission:
296 if (addJobFailure(job.moleQueueId()))
297 m_pendingSubmission.append(job.moleQueueId());
298 job.setJobState(MoleQueue::Error);
299 return;
300 }
301
302 job.setJobState(MoleQueue::Submitted);
303 clearJobFailures(job.moleQueueId());
304 job.setQueueId(queueId);
305 m_jobs.insert(queueId, job.moleQueueId());
306 }
307
requestQueueUpdate()308 void QueueRemoteSsh::requestQueueUpdate()
309 {
310 if (m_isCheckingQueue)
311 return;
312
313 if (m_jobs.isEmpty())
314 return;
315
316 m_isCheckingQueue = true;
317
318 const QString command = generateQueueRequestCommand();
319
320 SshConnection *conn = newSshConnection();
321 connect(conn, SIGNAL(requestComplete()),
322 this, SLOT(handleQueueUpdate()));
323
324 if (!conn->execute(command)) {
325 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
326 " '%2' port = '%3'")
327 .arg(conn->userName()).arg(conn->hostName())
328 .arg(conn->portNumber()));
329 conn->deleteLater();
330 return;
331 }
332 }
333
handleQueueUpdate()334 void QueueRemoteSsh::handleQueueUpdate()
335 {
336 SshConnection *conn = qobject_cast<SshConnection*>(sender());
337 if (!conn) {
338 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
339 .arg("Sender is not an SshConnection!"));
340 m_isCheckingQueue = false;
341 return;
342 }
343 conn->deleteLater();
344
345 if (!m_allowedQueueRequestExitCodes.contains(conn->exitCode())) {
346 Logger::logWarning(tr("Error requesting queue data (%1 -u %2) on remote "
347 "host %3@%4:%5. Exit code (%6) %7")
348 .arg(m_requestQueueCommand)
349 .arg(m_userName).arg(conn->userName())
350 .arg(conn->hostName()).arg(conn->portNumber())
351 .arg(conn->exitCode()).arg(conn->output()));
352 m_isCheckingQueue = false;
353 return;
354 }
355
356 QStringList output = conn->output().split("\n", QString::SkipEmptyParts);
357
358 // Get list of submitted queue ids so that we detect when jobs have left
359 // the queue.
360 QList<IdType> queueIds = m_jobs.keys();
361
362 MoleQueue::JobState state;
363 foreach (QString line, output) {
364 IdType queueId;
365 if (parseQueueLine(line, &queueId, &state)) {
366 IdType moleQueueId = m_jobs.value(queueId, InvalidId);
367 if (moleQueueId != InvalidId) {
368 queueIds.removeOne(queueId);
369 // Get pointer to jobmanager to lookup job
370 if (!m_server) {
371 Logger::logError(tr("Queue '%1' cannot locate Server instance!")
372 .arg(m_name), moleQueueId);
373 m_isCheckingQueue = false;
374 return;
375 }
376 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
377 if (!job.isValid()) {
378 Logger::logError(tr("Queue '%1' Cannot update invalid Job reference!")
379 .arg(m_name), moleQueueId);
380 continue;
381 }
382 job.setJobState(state);
383 }
384 }
385 }
386
387 // Now copy back any jobs that have left the queue
388 foreach (IdType queueId, queueIds)
389 beginFinalizeJob(queueId);
390
391 m_isCheckingQueue = false;
392 }
393
beginFinalizeJob(IdType queueId)394 void QueueRemoteSsh::beginFinalizeJob(IdType queueId)
395 {
396 IdType moleQueueId = m_jobs.value(queueId, InvalidId);
397 if (moleQueueId == InvalidId)
398 return;
399
400 m_jobs.remove(queueId);
401
402 // Lookup job
403 if (!m_server)
404 return;
405 Job job = m_server->jobManager()->lookupJobByMoleQueueId(moleQueueId);
406 if (!job.isValid())
407 return;
408
409 finalizeJobCopyFromServer(job);
410 }
411
finalizeJobCopyFromServer(Job job)412 void QueueRemoteSsh::finalizeJobCopyFromServer(Job job)
413 {
414 if (!job.retrieveOutput() ||
415 (job.cleanLocalWorkingDirectory() && job.outputDirectory().isEmpty())
416 ) {
417 // Jump to next step
418 finalizeJobCopyToCustomDestination(job);
419 return;
420 }
421
422 QString localDir = job.localWorkingDirectory() + "/..";
423 QString remoteDir =
424 QString("%1/%2").arg(m_workingDirectoryBase)
425 .arg(idTypeToString(job.moleQueueId()));
426 SshConnection *conn = newSshConnection();
427 conn->setData(QVariant::fromValue(job));
428 connect(conn, SIGNAL(requestComplete()),
429 this, SLOT(finalizeJobOutputCopiedFromServer()));
430
431 if (!conn->copyDirFrom(remoteDir, localDir)) {
432 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
433 " '%2' port = '%3'")
434 .arg(conn->userName()).arg(conn->hostName())
435 .arg(conn->portNumber()), job.moleQueueId());
436 job.setJobState(MoleQueue::Error);
437 conn->deleteLater();
438 return;
439 }
440 }
441
finalizeJobOutputCopiedFromServer()442 void QueueRemoteSsh::finalizeJobOutputCopiedFromServer()
443 {
444 SshConnection *conn = qobject_cast<SshConnection*>(sender());
445 if (!conn) {
446 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
447 .arg("Sender is not an SshConnection!"));
448 return;
449 }
450 conn->deleteLater();
451
452 Job job = conn->data().value<Job>();
453
454 if (!job.isValid()) {
455 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
456 .arg("Sender does not have an associated job!"));
457 return;
458 }
459
460 if (conn->exitCode() != 0) {
461 Logger::logError(tr("Error while copying job output from remote server:\n"
462 "%1@%2:%3 --> %4\nExit code (%5) %6")
463 .arg(conn->userName()).arg(conn->hostName())
464 .arg(conn->portNumber()).arg(job.localWorkingDirectory())
465 .arg(conn->exitCode()).arg(conn->output()),
466 job.moleQueueId());
467 job.setJobState(MoleQueue::Error);
468 return;
469 }
470
471 finalizeJobCopyToCustomDestination(job);
472 }
473
finalizeJobCopyToCustomDestination(Job job)474 void QueueRemoteSsh::finalizeJobCopyToCustomDestination(Job job)
475 {
476 // Skip to next step if needed
477 if (job.outputDirectory().isEmpty() ||
478 job.outputDirectory() == job.localWorkingDirectory()) {
479 finalizeJobCleanup(job);
480 return;
481 }
482
483 // The copy function will throw errors if needed.
484 if (!FileSystemTools::recursiveCopyDirectory(job.localWorkingDirectory(),
485 job.outputDirectory())) {
486 Logger::logError(tr("Cannot copy '%1' -> '%2'.")
487 .arg(job.localWorkingDirectory(),
488 job.outputDirectory()), job.moleQueueId());
489 job.setJobState(MoleQueue::Error);
490 return;
491 }
492
493 finalizeJobCleanup(job);
494 }
495
finalizeJobCleanup(Job job)496 void QueueRemoteSsh::finalizeJobCleanup(Job job)
497 {
498 if (job.cleanLocalWorkingDirectory())
499 cleanLocalDirectory(job);
500
501 if (job.cleanRemoteFiles())
502 cleanRemoteDirectory(job);
503
504 job.setJobState(MoleQueue::Finished);
505 }
506
cleanRemoteDirectory(Job job)507 void QueueRemoteSsh::cleanRemoteDirectory(Job job)
508 {
509 QString remoteDir = QDir::cleanPath(
510 QString("%1/%2").arg(m_workingDirectoryBase)
511 .arg(idTypeToString(job.moleQueueId())));
512
513 // Check that the remoteDir is not just "/" due to another bug.
514 if (remoteDir.simplified() == "/") {
515 Logger::logError(tr("Refusing to clean remote directory %1 -- an internal "
516 "error has occurred.").arg(remoteDir),
517 job.moleQueueId());
518 return;
519 }
520
521 QString command = QString ("rm -rf %1").arg(remoteDir);
522
523 SshConnection *conn = newSshConnection();
524 conn->setData(QVariant::fromValue(job));
525 connect(conn, SIGNAL(requestComplete()),
526 this, SLOT(remoteDirectoryCleaned()));
527
528 if (!conn->execute(command)) {
529 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
530 " '%2' port = '%3'")
531 .arg(conn->userName()).arg(conn->hostName())
532 .arg(conn->portNumber()), job.moleQueueId());
533 conn->deleteLater();
534 return;
535 }
536 }
537
remoteDirectoryCleaned()538 void QueueRemoteSsh::remoteDirectoryCleaned()
539 {
540 SshConnection *conn = qobject_cast<SshConnection*>(sender());
541 if (!conn) {
542 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
543 .arg("Sender is not an SshConnection!"));
544 return;
545 }
546 conn->deleteLater();
547
548 Job job = conn->data().value<Job>();
549
550 if (!job.isValid()) {
551 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
552 .arg("Sender does not have an associated job!"));
553 return;
554 }
555
556 if (conn->exitCode() != 0) {
557 Logger::logError(tr("Error clearing remote directory '%1@%2:%3/%4'.\n"
558 "Exit code (%5) %6")
559 .arg(conn->userName()).arg(conn->hostName())
560 .arg(m_workingDirectoryBase)
561 .arg(idTypeToString(job.moleQueueId()))
562 .arg(conn->exitCode()).arg(conn->output()),
563 job.moleQueueId());
564 job.setJobState(MoleQueue::Error);
565 return;
566 }
567 }
568
beginKillJob(Job job)569 void QueueRemoteSsh::beginKillJob(Job job)
570 {
571 const QString command = QString("%1 %2")
572 .arg(m_killCommand)
573 .arg(idTypeToString(job.queueId()));
574
575 SshConnection *conn = newSshConnection();
576 conn->setData(QVariant::fromValue(job));
577 connect(conn, SIGNAL(requestComplete()),
578 this, SLOT(endKillJob()));
579
580 if (!conn->execute(command)) {
581 Logger::logError(tr("Could not initialize ssh resources: user= '%1'\nhost ="
582 " '%2' port = '%3'")
583 .arg(conn->userName()).arg(conn->hostName())
584 .arg(conn->portNumber()), job.moleQueueId());
585 job.setJobState(MoleQueue::Error);
586 conn->deleteLater();
587 return;
588 }
589 }
590
endKillJob()591 void QueueRemoteSsh::endKillJob()
592 {
593 SshConnection *conn = qobject_cast<SshConnection*>(sender());
594 if (!conn) {
595 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
596 .arg("Sender is not an SshConnection!"));
597 return;
598 }
599 conn->deleteLater();
600
601 Job job = conn->data().value<Job>();
602 if (!job.isValid()) {
603 Logger::logError(tr("Internal error: %1\n%2").arg(Q_FUNC_INFO)
604 .arg("Sender does not have an associated job!"));
605 return;
606 }
607
608 if (conn->exitCode() != 0) {
609 Logger::logWarning(tr("Error cancelling job (mqid=%1, queueid=%2) on "
610 "%3@%4:%5 (queue=%6)\n(%7) %8")
611 .arg(idTypeToString(job.moleQueueId()))
612 .arg(idTypeToString(job.queueId()))
613 .arg(conn->userName()).arg(conn->hostName())
614 .arg(conn->portNumber()).arg(m_name)
615 .arg(conn->exitCode()).arg(conn->output()));
616 return;
617 }
618
619 job.setJobState(MoleQueue::Canceled);
620 }
621
newSshConnection()622 SshConnection *QueueRemoteSsh::newSshConnection()
623 {
624 SshCommand *command = SshCommandFactory::instance()->newSshCommand();
625 command->setSshCommand(m_sshExecutable);
626 command->setScpCommand(m_scpExecutable);
627 command->setHostName(m_hostName);
628 command->setUserName(m_userName);
629 command->setIdentityFile(m_identityFile);
630 command->setPortNumber(m_sshPort);
631
632 return command;
633 }
634
generateQueueRequestCommand()635 QString QueueRemoteSsh::generateQueueRequestCommand()
636 {
637 QList<IdType> queueIds = m_jobs.keys();
638 QString queueIdString;
639 foreach (IdType id, queueIds) {
640 if (id != InvalidId)
641 queueIdString += QString::number(id) + " ";
642 }
643
644 return QString ("%1 %2").arg(m_requestQueueCommand).arg(queueIdString);
645 }
646
647 } // End namespace
648