1 /******************************************************************************
2   This source file is part of the Avogadro project.
3   This source code is released under the 3-Clause BSD License, (see "LICENSE").
4 ******************************************************************************/
5 
6 #include "batchjob.h"
7 #include "molequeuemanager.h"
8 
9 #include <QtCore/QDebug>
10 
11 #include <limits>
12 
13 namespace Avogadro {
14 namespace MoleQueue {
15 
16 // initialize statics
17 const BatchJob::BatchId BatchJob::InvalidBatchId = -1;
18 const BatchJob::RequestId BatchJob::InvalidRequestId = -1;
19 const BatchJob::ServerId BatchJob::InvalidServerId =
20   std::numeric_limits<BatchJob::ServerId>::max();
21 
BatchJob(QObject * par)22 BatchJob::BatchJob(QObject* par) : QObject(par)
23 {
24   setup();
25 }
26 
BatchJob(const QString & scriptFilePath,QObject * par)27 BatchJob::BatchJob(const QString& scriptFilePath, QObject* par)
28   : QObject(par), m_inputGenerator(scriptFilePath)
29 {
30   setup();
31 }
32 
~BatchJob()33 BatchJob::~BatchJob()
34 {
35 }
36 
submitNextJob(const Core::Molecule & mol)37 BatchJob::BatchId BatchJob::submitNextJob(const Core::Molecule& mol)
38 {
39   // Is everything configured?
40   if (!m_inputGenerator.isValid() || m_inputGeneratorOptions.empty() ||
41       m_moleQueueOptions.empty()) {
42     return InvalidBatchId;
43   }
44 
45   // Verify that molequeue is running:
46   MoleQueueManager& mqManager = MoleQueueManager::instance();
47   if (!mqManager.connectIfNeeded())
48     return InvalidBatchId;
49 
50   // Generate the input:
51   if (!m_inputGenerator.generateInput(m_inputGeneratorOptions, mol)) {
52     if (!m_inputGenerator.errorList().isEmpty()) {
53       qWarning() << "BatchJob::submitNextJob() error:\n\t"
54                  << m_inputGenerator.errorList().join("\n\t");
55     }
56     return InvalidBatchId;
57   }
58 
59   // Warnings are non-fatal -- just print them for now:
60   if (!m_inputGenerator.warningList().isEmpty()) {
61     qWarning() << "BatchJob::submitNextJob() warning:\n\t"
62                << m_inputGenerator.warningList().join("\n\t");
63   }
64 
65   BatchId bId = m_jobObjects.size();
66 
67   // Create the job object:
68   JobObject job;
69   job.fromJson(m_moleQueueOptions);
70   job.setDescription(
71     tr("Batch Job #%L1 (%2)").arg(bId + 1).arg(job.description()));
72 
73   // Main input file:
74   const QString mainFileName = m_inputGenerator.mainFileName();
75   job.setInputFile(mainFileName, m_inputGenerator.fileContents(mainFileName));
76 
77   // Any additional input files:
78   QStringList fileNames = m_inputGenerator.fileNames();
79   fileNames.removeOne(mainFileName);
80   foreach (const QString& fn, fileNames)
81     job.appendAdditionalInputFile(fn, m_inputGenerator.fileContents(fn));
82 
83   // Submit the job
84   RequestId rId = mqManager.client().submitJob(job);
85 
86   // Was submission successful?
87   if (rId < 0)
88     return InvalidBatchId;
89 
90   // Register the job and assign the ID.
91   m_jobObjects.push_back(job);
92   m_states.push_back(None);
93   m_requests.insert(rId, Request(Request::SubmitJob, bId));
94 
95   return bId;
96 }
97 
lookupJob(BatchId bId)98 bool BatchJob::lookupJob(BatchId bId)
99 {
100   ServerId sId = serverId(static_cast<BatchId>(bId));
101   if (sId == InvalidServerId)
102     return false;
103 
104   // Verify that molequeue is running:
105   MoleQueueManager& mqManager = MoleQueueManager::instance();
106   if (!mqManager.connectIfNeeded())
107     return false;
108 
109   Client& client = mqManager.client();
110   RequestId rId = client.lookupJob(sId);
111   m_requests.insert(rId, Request(Request::LookupJob, bId));
112   return true;
113 }
114 
handleSubmissionReply(int rId,unsigned int sId)115 void BatchJob::handleSubmissionReply(int rId, unsigned int sId)
116 {
117   Request req = m_requests.value(rId);
118   if (req.isValid()) {
119     m_requests.remove(rId);
120     if (req.batchId >= m_jobObjects.size()) {
121       qWarning() << "BatchJob::handleSubmissionReply(): batchID out of range.";
122       return;
123     }
124     m_jobObjects[req.batchId].setValue("moleQueueId",
125                                        QVariant(static_cast<ServerId>(sId)));
126     m_serverIds.insert(sId, req.batchId);
127     // Request full job details:
128     lookupJob(req.batchId);
129   }
130 }
131 
handleJobStateChange(unsigned int sId,const QString &,const QString &)132 void BatchJob::handleJobStateChange(unsigned int sId, const QString&,
133                                     const QString&)
134 {
135   BatchId bId = m_serverIds.value(static_cast<ServerId>(sId), InvalidBatchId);
136   if (bId == InvalidBatchId)
137     return;
138   // Update full job details:
139   lookupJob(bId);
140 }
141 
handleLookupJobReply(int rId,const QJsonObject & jobInfo)142 void BatchJob::handleLookupJobReply(int rId, const QJsonObject& jobInfo)
143 {
144   Request req = m_requests.value(rId);
145   if (req.isValid()) {
146     m_requests.remove(rId);
147     if (req.batchId >= m_jobObjects.size()) {
148       qWarning() << "BatchJob::handleSubmissionReply(): batchID out of range.";
149       return;
150     }
151     JobObject& job(m_jobObjects[req.batchId]);
152     job.fromJson(jobInfo);
153 
154     JobState oldState = m_states[req.batchId];
155     JobState newState = stringToState(job.value("jobState").toString());
156     m_states[req.batchId] = newState;
157     emit jobUpdated(req.batchId, true);
158     if (!isTerminal(oldState) && isTerminal(newState))
159       emit jobCompleted(req.batchId, newState);
160   }
161 }
162 
handleErrorResponse(int requestId,int errorCode,const QString & errorMessage,const QJsonValue & errorData)163 void BatchJob::handleErrorResponse(int requestId, int errorCode,
164                                    const QString& errorMessage,
165                                    const QJsonValue& errorData)
166 {
167   qDebug() << "Error rcv'd: {"
168            << "requestId:" << requestId << "errorCode:" << errorCode
169            << "errorMessage:" << errorMessage << "errorData:" << errorData
170            << "}";
171 
172   Request req = m_requests.value(requestId);
173 
174   if (!req.isValid())
175     return;
176 
177   m_requests.remove(requestId);
178 
179   if (req.batchId < m_jobObjects.size())
180     return;
181 
182   switch (req.type) {
183     case Request::SubmitJob:
184       // The job was rejected:
185       qDebug() << "Batch job" << req.batchId << "was rejected by MoleQueue.";
186       m_states[req.batchId] = Rejected;
187       m_jobObjects[req.batchId].fromJson(QJsonObject());
188       break;
189     case Request::LookupJob:
190       qDebug() << "Batch job" << req.batchId << "failed to update.";
191       emit jobUpdated(req.batchId, false);
192       break;
193     default:
194     case Request::InvalidType:
195       break;
196   }
197 }
198 
setup()199 void BatchJob::setup()
200 {
201   static bool metaTypesRegistered = false;
202   if (!metaTypesRegistered) {
203     qRegisterMetaType<BatchId>("Avogadro::QtGui::BatchJob::BatchId");
204     qRegisterMetaType<BatchId>("BatchId");
205     qRegisterMetaType<ServerId>("Avogadro::QtGui::BatchJob::ServerId");
206     qRegisterMetaType<ServerId>("ServerId");
207     qRegisterMetaType<RequestId>("Avogadro::QtGui::BatchJob::RequestId");
208     qRegisterMetaType<RequestId>("RequestId");
209     metaTypesRegistered = true;
210   }
211 
212   MoleQueueManager& mqManager = MoleQueueManager::instance();
213   Client& client = mqManager.client();
214   connect(&client, SIGNAL(submitJobResponse(int, uint)),
215           SLOT(handleSubmissionReply(int, uint)));
216   connect(&client, SIGNAL(lookupJobResponse(int, QJsonObject)),
217           SLOT(handleLookupJobReply(int, QJsonObject)));
218   connect(&client, SIGNAL(jobStateChanged(uint, QString, QString)),
219           SLOT(handleJobStateChange(uint, QString, QString)));
220   connect(&client, SIGNAL(errorReceived(int, int, QString, QJsonValue)),
221           SLOT(handleErrorResponse(int, int, QString, QJsonValue)));
222 }
223 
224 } // namespace MoleQueue
225 } // namespace Avogadro
226