1 /******************************************************************************
2 This source file is part of the Avogadro project.
3 This source code is released under the 3-Clause BSD License, (see "LICENSE").
4 ******************************************************************************/
5
6 #include "batchjob.h"
7 #include "molequeuemanager.h"
8
9 #include <QtCore/QDebug>
10
11 #include <limits>
12
13 namespace Avogadro {
14 namespace MoleQueue {
15
16 // initialize statics
17 const BatchJob::BatchId BatchJob::InvalidBatchId = -1;
18 const BatchJob::RequestId BatchJob::InvalidRequestId = -1;
19 const BatchJob::ServerId BatchJob::InvalidServerId =
20 std::numeric_limits<BatchJob::ServerId>::max();
21
BatchJob(QObject * par)22 BatchJob::BatchJob(QObject* par) : QObject(par)
23 {
24 setup();
25 }
26
BatchJob(const QString & scriptFilePath,QObject * par)27 BatchJob::BatchJob(const QString& scriptFilePath, QObject* par)
28 : QObject(par), m_inputGenerator(scriptFilePath)
29 {
30 setup();
31 }
32
~BatchJob()33 BatchJob::~BatchJob()
34 {
35 }
36
submitNextJob(const Core::Molecule & mol)37 BatchJob::BatchId BatchJob::submitNextJob(const Core::Molecule& mol)
38 {
39 // Is everything configured?
40 if (!m_inputGenerator.isValid() || m_inputGeneratorOptions.empty() ||
41 m_moleQueueOptions.empty()) {
42 return InvalidBatchId;
43 }
44
45 // Verify that molequeue is running:
46 MoleQueueManager& mqManager = MoleQueueManager::instance();
47 if (!mqManager.connectIfNeeded())
48 return InvalidBatchId;
49
50 // Generate the input:
51 if (!m_inputGenerator.generateInput(m_inputGeneratorOptions, mol)) {
52 if (!m_inputGenerator.errorList().isEmpty()) {
53 qWarning() << "BatchJob::submitNextJob() error:\n\t"
54 << m_inputGenerator.errorList().join("\n\t");
55 }
56 return InvalidBatchId;
57 }
58
59 // Warnings are non-fatal -- just print them for now:
60 if (!m_inputGenerator.warningList().isEmpty()) {
61 qWarning() << "BatchJob::submitNextJob() warning:\n\t"
62 << m_inputGenerator.warningList().join("\n\t");
63 }
64
65 BatchId bId = m_jobObjects.size();
66
67 // Create the job object:
68 JobObject job;
69 job.fromJson(m_moleQueueOptions);
70 job.setDescription(
71 tr("Batch Job #%L1 (%2)").arg(bId + 1).arg(job.description()));
72
73 // Main input file:
74 const QString mainFileName = m_inputGenerator.mainFileName();
75 job.setInputFile(mainFileName, m_inputGenerator.fileContents(mainFileName));
76
77 // Any additional input files:
78 QStringList fileNames = m_inputGenerator.fileNames();
79 fileNames.removeOne(mainFileName);
80 foreach (const QString& fn, fileNames)
81 job.appendAdditionalInputFile(fn, m_inputGenerator.fileContents(fn));
82
83 // Submit the job
84 RequestId rId = mqManager.client().submitJob(job);
85
86 // Was submission successful?
87 if (rId < 0)
88 return InvalidBatchId;
89
90 // Register the job and assign the ID.
91 m_jobObjects.push_back(job);
92 m_states.push_back(None);
93 m_requests.insert(rId, Request(Request::SubmitJob, bId));
94
95 return bId;
96 }
97
lookupJob(BatchId bId)98 bool BatchJob::lookupJob(BatchId bId)
99 {
100 ServerId sId = serverId(static_cast<BatchId>(bId));
101 if (sId == InvalidServerId)
102 return false;
103
104 // Verify that molequeue is running:
105 MoleQueueManager& mqManager = MoleQueueManager::instance();
106 if (!mqManager.connectIfNeeded())
107 return false;
108
109 Client& client = mqManager.client();
110 RequestId rId = client.lookupJob(sId);
111 m_requests.insert(rId, Request(Request::LookupJob, bId));
112 return true;
113 }
114
handleSubmissionReply(int rId,unsigned int sId)115 void BatchJob::handleSubmissionReply(int rId, unsigned int sId)
116 {
117 Request req = m_requests.value(rId);
118 if (req.isValid()) {
119 m_requests.remove(rId);
120 if (req.batchId >= m_jobObjects.size()) {
121 qWarning() << "BatchJob::handleSubmissionReply(): batchID out of range.";
122 return;
123 }
124 m_jobObjects[req.batchId].setValue("moleQueueId",
125 QVariant(static_cast<ServerId>(sId)));
126 m_serverIds.insert(sId, req.batchId);
127 // Request full job details:
128 lookupJob(req.batchId);
129 }
130 }
131
handleJobStateChange(unsigned int sId,const QString &,const QString &)132 void BatchJob::handleJobStateChange(unsigned int sId, const QString&,
133 const QString&)
134 {
135 BatchId bId = m_serverIds.value(static_cast<ServerId>(sId), InvalidBatchId);
136 if (bId == InvalidBatchId)
137 return;
138 // Update full job details:
139 lookupJob(bId);
140 }
141
handleLookupJobReply(int rId,const QJsonObject & jobInfo)142 void BatchJob::handleLookupJobReply(int rId, const QJsonObject& jobInfo)
143 {
144 Request req = m_requests.value(rId);
145 if (req.isValid()) {
146 m_requests.remove(rId);
147 if (req.batchId >= m_jobObjects.size()) {
148 qWarning() << "BatchJob::handleSubmissionReply(): batchID out of range.";
149 return;
150 }
151 JobObject& job(m_jobObjects[req.batchId]);
152 job.fromJson(jobInfo);
153
154 JobState oldState = m_states[req.batchId];
155 JobState newState = stringToState(job.value("jobState").toString());
156 m_states[req.batchId] = newState;
157 emit jobUpdated(req.batchId, true);
158 if (!isTerminal(oldState) && isTerminal(newState))
159 emit jobCompleted(req.batchId, newState);
160 }
161 }
162
handleErrorResponse(int requestId,int errorCode,const QString & errorMessage,const QJsonValue & errorData)163 void BatchJob::handleErrorResponse(int requestId, int errorCode,
164 const QString& errorMessage,
165 const QJsonValue& errorData)
166 {
167 qDebug() << "Error rcv'd: {"
168 << "requestId:" << requestId << "errorCode:" << errorCode
169 << "errorMessage:" << errorMessage << "errorData:" << errorData
170 << "}";
171
172 Request req = m_requests.value(requestId);
173
174 if (!req.isValid())
175 return;
176
177 m_requests.remove(requestId);
178
179 if (req.batchId < m_jobObjects.size())
180 return;
181
182 switch (req.type) {
183 case Request::SubmitJob:
184 // The job was rejected:
185 qDebug() << "Batch job" << req.batchId << "was rejected by MoleQueue.";
186 m_states[req.batchId] = Rejected;
187 m_jobObjects[req.batchId].fromJson(QJsonObject());
188 break;
189 case Request::LookupJob:
190 qDebug() << "Batch job" << req.batchId << "failed to update.";
191 emit jobUpdated(req.batchId, false);
192 break;
193 default:
194 case Request::InvalidType:
195 break;
196 }
197 }
198
setup()199 void BatchJob::setup()
200 {
201 static bool metaTypesRegistered = false;
202 if (!metaTypesRegistered) {
203 qRegisterMetaType<BatchId>("Avogadro::QtGui::BatchJob::BatchId");
204 qRegisterMetaType<BatchId>("BatchId");
205 qRegisterMetaType<ServerId>("Avogadro::QtGui::BatchJob::ServerId");
206 qRegisterMetaType<ServerId>("ServerId");
207 qRegisterMetaType<RequestId>("Avogadro::QtGui::BatchJob::RequestId");
208 qRegisterMetaType<RequestId>("RequestId");
209 metaTypesRegistered = true;
210 }
211
212 MoleQueueManager& mqManager = MoleQueueManager::instance();
213 Client& client = mqManager.client();
214 connect(&client, SIGNAL(submitJobResponse(int, uint)),
215 SLOT(handleSubmissionReply(int, uint)));
216 connect(&client, SIGNAL(lookupJobResponse(int, QJsonObject)),
217 SLOT(handleLookupJobReply(int, QJsonObject)));
218 connect(&client, SIGNAL(jobStateChanged(uint, QString, QString)),
219 SLOT(handleJobStateChange(uint, QString, QString)));
220 connect(&client, SIGNAL(errorReceived(int, int, QString, QJsonValue)),
221 SLOT(handleErrorResponse(int, int, QString, QJsonValue)));
222 }
223
224 } // namespace MoleQueue
225 } // namespace Avogadro
226