1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "ExternalProcessWorker.h"
23
24 #include <QDateTime>
25 #include <QFile>
26 #include <QUuid>
27
28 #include <U2Core/AnnotationTableObject.h>
29 #include <U2Core/AppContext.h>
30 #include <U2Core/AppSettings.h>
31 #include <U2Core/CmdlineTaskRunner.h>
32 #include <U2Core/DNASequenceObject.h>
33 #include <U2Core/DocumentModel.h>
34 #include <U2Core/ExternalToolRegistry.h>
35 #include <U2Core/ExternalToolRunTask.h>
36 #include <U2Core/FailTask.h>
37 #include <U2Core/FileAndDirectoryUtils.h>
38 #include <U2Core/GObjectRelationRoles.h>
39 #include <U2Core/GUrlUtils.h>
40 #include <U2Core/IOAdapter.h>
41 #include <U2Core/MultipleSequenceAlignmentImporter.h>
42 #include <U2Core/MultipleSequenceAlignmentObject.h>
43 #include <U2Core/TextObject.h>
44 #include <U2Core/U2AlphabetUtils.h>
45 #include <U2Core/U2OpStatusUtils.h>
46 #include <U2Core/U2SequenceUtils.h>
47 #include <U2Core/UserApplicationsSettings.h>
48
49 #include <U2Designer/DelegateEditors.h>
50
51 #include <U2Lang/ActorPrototypeRegistry.h>
52 #include <U2Lang/BaseActorCategories.h>
53 #include <U2Lang/BaseSlots.h>
54 #include <U2Lang/BaseTypes.h>
55 #include <U2Lang/ExternalToolCfg.h>
56 #include <U2Lang/IncludedProtoFactory.h>
57 #include <U2Lang/WorkflowEnv.h>
58 #include <U2Lang/WorkflowMonitor.h>
59
60 #include "CustomExternalToolLogParser.h"
61 #include "CustomExternalToolRunTaskHelper.h"
62 #include "util/CustomWorkerUtils.h"
63
64 namespace U2 {
65 namespace LocalWorkflow {
66
67 const static QString INPUT_PORT_TYPE("input-for-");
68 const static QString OUTPUT_PORT_TYPE("output-for-");
69 static const QString OUT_PORT_ID("out");
70
init(ExternalProcessConfig * cfg)71 bool ExternalProcessWorkerFactory::init(ExternalProcessConfig *cfg) {
72 QScopedPointer<ActorPrototype> proto(IncludedProtoFactory::getExternalToolProto(cfg));
73 const bool prototypeRegistered = WorkflowEnv::getProtoRegistry()->registerProto(BaseActorCategories::CATEGORY_EXTERNAL(), proto.data());
74 CHECK(prototypeRegistered, false);
75 proto.take();
76
77 const bool factoryRegistered = IncludedProtoFactory::registerExternalToolWorker(cfg);
78 CHECK_EXT(factoryRegistered, delete WorkflowEnv::getProtoRegistry()->unregisterProto(cfg->id), false);
79
80 return true;
81 }
82
83 namespace {
toStringValue(const QVariantMap & data,U2OpStatus & os)84 static QString toStringValue(const QVariantMap &data, U2OpStatus &os) {
85 QString slot = BaseSlots::TEXT_SLOT().getId();
86 if (!data.contains(slot)) {
87 os.setError(QObject::tr("Empty text slot"));
88 return "";
89 }
90 return data[slot].value<QString>();
91 }
92
toSequence(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)93 static U2SequenceObject *toSequence(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
94 QString slot = BaseSlots::DNA_SEQUENCE_SLOT().getId();
95 if (!data.contains(slot)) {
96 os.setError(QObject::tr("Empty sequence slot"));
97 return nullptr;
98 }
99 SharedDbiDataHandler seqId = data[slot].value<SharedDbiDataHandler>();
100 U2SequenceObject *seqObj = StorageUtils::getSequenceObject(context->getDataStorage(), seqId);
101 if (nullptr == seqObj) {
102 os.setError(QObject::tr("Error with sequence object"));
103 }
104 return seqObj;
105 }
106
toAnotations(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)107 static AnnotationTableObject *toAnotations(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
108 QString slot = BaseSlots::ANNOTATION_TABLE_SLOT().getId();
109 if (!data.contains(slot)) {
110 os.setError(QObject::tr("Empty annotations slot"));
111 return nullptr;
112 }
113 const QVariant annotationsData = data[slot];
114 const QList<SharedAnnotationData> annList = StorageUtils::getAnnotationTable(context->getDataStorage(), annotationsData);
115
116 AnnotationTableObject *annsObj = new AnnotationTableObject("Annotations", context->getDataStorage()->getDbiRef());
117 annsObj->addAnnotations(annList);
118
119 return annsObj;
120 }
121
toAlignment(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)122 static MultipleSequenceAlignmentObject *toAlignment(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
123 QString slot = BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId();
124 if (!data.contains(slot)) {
125 os.setError(QObject::tr("Empty alignment slot"));
126 return nullptr;
127 }
128 SharedDbiDataHandler msaId = data[slot].value<SharedDbiDataHandler>();
129 MultipleSequenceAlignmentObject *msaObj = StorageUtils::getMsaObject(context->getDataStorage(), msaId);
130 if (nullptr == msaObj) {
131 os.setError(QObject::tr("Error with alignment object"));
132 }
133 return msaObj;
134 }
135
toText(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)136 static TextObject *toText(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
137 QString slot = BaseSlots::TEXT_SLOT().getId();
138 if (!data.contains(slot)) {
139 os.setError(QObject::tr("Empty text slot"));
140 return nullptr;
141 }
142 QString text = data[slot].value<QString>();
143 return TextObject::createInstance(text, "tmp_text_object", context->getDataStorage()->getDbiRef(), os);
144 }
145
generateAndCreateURL(const QString & extention,const QString & name)146 static QString generateAndCreateURL(const QString &extention, const QString &name) {
147 QString url;
148 QString path = AppContext::getAppSettings()->getUserAppsSettings()->getCurrentProcessTemporaryDirPath("wd_external");
149 QDir dir(path);
150 if (!dir.exists()) {
151 dir.mkpath(path);
152 }
153 url = path + "/tmp" + GUrlUtils::fixFileName(name + "_" + QUuid::createUuid().toString()) + "." + extention;
154 return url;
155 }
156
getFormat(const DataConfig & dataCfg,U2OpStatus & os)157 static DocumentFormat *getFormat(const DataConfig &dataCfg, U2OpStatus &os) {
158 DocumentFormat *f = AppContext::getDocumentFormatRegistry()->getFormatById(dataCfg.format);
159 if (nullptr == f) {
160 os.setError(QObject::tr("Unknown document format: %1").arg(dataCfg.format));
161 }
162 return f;
163 }
164
createDocument(const DataConfig & dataCfg,U2OpStatus & os)165 static Document *createDocument(const DataConfig &dataCfg, U2OpStatus &os) {
166 DocumentFormat *f = getFormat(dataCfg, os);
167 CHECK_OP(os, nullptr);
168
169 IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(BaseIOAdapters::LOCAL_FILE);
170 QString url = generateAndCreateURL(f->getSupportedDocumentFileExtensions().first(), dataCfg.attrName);
171 QScopedPointer<Document> d(f->createNewLoadedDocument(iof, url, os));
172 CHECK_OP(os, nullptr);
173 d->setDocumentOwnsDbiResources(false);
174 return d.take();
175 }
176
loadDocument(const QString & url,const DataConfig & dataCfg,WorkflowContext * context,U2OpStatus & os)177 static Document *loadDocument(const QString &url, const DataConfig &dataCfg, WorkflowContext *context, U2OpStatus &os) {
178 DocumentFormat *f = getFormat(dataCfg, os);
179 CHECK_OP(os, nullptr);
180
181 IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(BaseIOAdapters::LOCAL_FILE);
182 QVariantMap hints;
183 U2DbiRef dbiRef = context->getDataStorage()->getDbiRef();
184 hints.insert(DocumentFormat::DBI_REF_HINT, qVariantFromValue(dbiRef));
185 QScopedPointer<Document> d(f->loadDocument(iof, url, hints, os));
186 CHECK_OP(os, nullptr);
187 d->setDocumentOwnsDbiResources(false);
188 return d.take();
189 }
190
addObjects(Document * d,WorkflowContext * context,const DataConfig & dataCfg,const QVariantMap & data,U2OpStatus & os)191 static void addObjects(Document *d, WorkflowContext *context, const DataConfig &dataCfg, const QVariantMap &data, U2OpStatus &os) {
192 if (dataCfg.isSequence()) {
193 U2SequenceObject *seqObj = toSequence(data, context, os);
194 CHECK_OP(os, );
195 d->addObject(seqObj);
196 } else if (dataCfg.isAnnotations()) {
197 AnnotationTableObject *annsObj = toAnotations(data, context, os);
198 CHECK_OP(os, );
199 d->addObject(annsObj);
200 } else if (dataCfg.isAlignment()) {
201 MultipleSequenceAlignmentObject *msaObj = toAlignment(data, context, os);
202 CHECK_OP(os, );
203 d->addObject(msaObj);
204 } else if (dataCfg.isAnnotatedSequence()) {
205 U2SequenceObject *seqObj = toSequence(data, context, os);
206 CHECK_OP(os, );
207 d->addObject(seqObj);
208 AnnotationTableObject *annsObj = toAnotations(data, context, os);
209 CHECK_OP(os, );
210 d->addObject(annsObj);
211
212 QList<GObjectRelation> rel;
213 rel << GObjectRelation(GObjectReference(seqObj), ObjectRole_Sequence);
214 annsObj->setObjectRelations(rel);
215 } else if (dataCfg.isText()) {
216 TextObject *textObj = toText(data, context, os);
217 CHECK_OP(os, );
218 d->addObject(textObj);
219 }
220 }
221 } // namespace
222
ExternalProcessWorker(Actor * a)223 ExternalProcessWorker::ExternalProcessWorker(Actor *a)
224 : BaseWorker(a, false),
225 output(nullptr) {
226 ExternalToolCfgRegistry *reg = WorkflowEnv::getExternalCfgRegistry();
227 cfg = reg->getConfigById(actor->getProto()->getId());
228 }
229
applySpecialInternalEnvvars(QString & execString,ExternalProcessConfig * cfg)230 void ExternalProcessWorker::applySpecialInternalEnvvars(QString &execString,
231 ExternalProcessConfig *cfg) {
232 CustomWorkerUtils::commandReplaceAllSpecialByUgenePath(execString, cfg);
233 }
234
applyAttributes(QString & execString)235 void ExternalProcessWorker::applyAttributes(QString &execString) {
236 foreach (Attribute *a, actor->getAttributes()) {
237 QString attrValue = a->getAttributePureValue().toString();
238 DataTypePtr attrType = a->getAttributeType();
239 if (attrType == BaseTypes::STRING_TYPE()) {
240 attrValue = GUrlUtils::getQuotedString(attrValue);
241 }
242 bool wasReplaced = applyParamsToExecString(execString,
243 a->getId(),
244 attrValue);
245
246 if (wasReplaced) {
247 foreach (const AttributeConfig &attributeConfig, cfg->attrs) {
248 if (attributeConfig.attributeId == a->getId() && attributeConfig.flags.testFlag(AttributeConfig::AddToDashboard)) {
249 urlsForDashboard.insert(attrValue,
250 !attributeConfig.flags.testFlag(AttributeConfig::OpenWithUgene));
251 break;
252 }
253 }
254 }
255 }
256 }
257
applyParamsToExecString(QString & execString,QString parName,QString parValue)258 bool ExternalProcessWorker::applyParamsToExecString(QString &execString, QString parName, QString parValue) {
259 QRegularExpression regex = QRegularExpression(QString("((([^\\\\])|([^\\\\](\\\\\\\\)+)|(^))\\$)") + QString("(") + parName + QString(")") + (QString("(?=([^") + WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE + QString("]|$))")));
260 bool wasReplaced = false;
261
262 // Replace the params one-by-one
263 QRegularExpressionMatchIterator iter = regex.globalMatch(execString);
264 while (iter.hasNext()) {
265 QRegularExpressionMatch match = iter.next();
266 if (match.hasMatch()) {
267 QString m1 = match.captured(1);
268 int start = match.capturedStart(0);
269 int len = match.capturedLength();
270 execString.replace(start + m1.length() - 1, len - m1.length() + 1, parValue);
271 wasReplaced = true;
272
273 // We need to re-iterate as the string was changed
274 iter = regex.globalMatch(execString);
275 }
276 }
277
278 return wasReplaced;
279 }
280
applyEscapedSymbols(QString & execString)281 void ExternalProcessWorker::applyEscapedSymbols(QString &execString) {
282 // Replace escaped symbols
283 // Example:
284 // "%USUPP_JAVA% \\%USUPP_JAVA% -version \\\$\%\\\\\%\\$" ─┐
285 // "/usr/bin/java \/usr/bin/java -version $%\\%\$" <─┘
286 execString.replace(QRegularExpression("\\\\([\\\\\\%\\$])"), "\\1");
287 }
288
applyInputMessage(QString & execString,const DataConfig & dataCfg,const QVariantMap & data,U2OpStatus & os)289 QStringList ExternalProcessWorker::applyInputMessage(QString &execString, const DataConfig &dataCfg, const QVariantMap &data, U2OpStatus &os) {
290 QStringList urls;
291 QString paramValue;
292
293 if (dataCfg.isStringValue()) {
294 paramValue = GUrlUtils::getQuotedString(toStringValue(data, os));
295 CHECK_OP(os, urls);
296 } else {
297 QScopedPointer<Document> d(createDocument(dataCfg, os));
298 CHECK_OP(os, urls);
299 addObjects(d.data(), context, dataCfg, data, os);
300 CHECK_OP(os, urls);
301
302 DocumentFormat *f = getFormat(dataCfg, os);
303 CHECK_OP(os, urls);
304 f->storeDocument(d.data(), os);
305 CHECK_OP(os, urls);
306 urls << d->getURLString();
307 paramValue = GUrlUtils::getQuotedString(d->getURLString());
308 }
309
310 applyParamsToExecString(execString, dataCfg.attributeId, paramValue);
311 return urls;
312 }
313
prepareOutput(QString & execString,const DataConfig & dataCfg,U2OpStatus & os)314 QString ExternalProcessWorker::prepareOutput(QString &execString, const DataConfig &dataCfg, U2OpStatus &os) {
315 QString extension;
316
317 if (dataCfg.isFileUrl()) {
318 extension = "tmp";
319 } else {
320 DocumentFormat *f = getFormat(dataCfg, os);
321 CHECK_OP(os, "")
322 extension = f->getSupportedDocumentFileExtensions().first();
323 }
324 QString url = generateAndCreateURL(extension, dataCfg.attrName);
325 bool replaced = applyParamsToExecString(execString, dataCfg.attributeId, GUrlUtils::getQuotedString(url));
326 CHECK(replaced, "")
327
328 return url;
329 }
330
tick()331 Task *ExternalProcessWorker::tick() {
332 QString error;
333 if (!inputs.isEmpty() && finishWorkIfInputEnded(error)) {
334 if (!error.isEmpty()) {
335 return new FailTask(error);
336 } else {
337 return nullptr;
338 }
339 }
340
341 QString execString = commandLine;
342
343 int i = 0;
344 foreach (const DataConfig &dataCfg, cfg->inputs) { // write all input data to files
345 Message inputMessage = getMessageAndSetupScriptValues(inputs[i]);
346 i++;
347 QVariantMap data = inputMessage.getData().toMap();
348 U2OpStatusImpl os;
349 inputUrls << applyInputMessage(execString, dataCfg, data, os);
350 CHECK_OP(os, new FailTask(os.getError()));
351 }
352
353 QMap<QString, DataConfig> outputUrls;
354 foreach (const DataConfig &dataCfg, cfg->outputs) {
355 U2OpStatusImpl os;
356 QString url = prepareOutput(execString, dataCfg, os);
357 CHECK_OP(os, new FailTask(os.getError()));
358 if (!url.isEmpty()) {
359 outputUrls[url] = dataCfg;
360 }
361 }
362
363 // The following call must be last call in the preparing execString chain
364 // So, this is a very last step for execString:
365 // 1) function init(): the first one is substitution of the internal vars (like '%...%')
366 // 2) function init(): the second is applying attributes (something like '$...')
367 // 3) this function: apply substitutions for Input/Output
368 // 4) this function: this call replaces escaped symbols: '\$', '\%', '\\' by the '$', '%', '\'
369 applyEscapedSymbols(execString);
370
371 const QString workingDirectory = FileAndDirectoryUtils::createWorkingDir(context->workingDir(), FileAndDirectoryUtils::WORKFLOW_INTERNAL, "", context->workingDir());
372 QString externalProcessFolder = GUrlUtils::fixFileName(cfg->name).replace(' ', '_');
373 U2OpStatusImpl os;
374 const QString externalProcessWorkingDir = GUrlUtils::createDirectory(workingDirectory + externalProcessFolder, "_", os);
375 CHECK_OP(os, new FailTask(os.getError()));
376
377 LaunchExternalToolTask *task = new LaunchExternalToolTask(execString, externalProcessWorkingDir, outputUrls);
378 QList<ExternalToolListener *> listeners(createLogListeners());
379 task->addListeners(listeners);
380 connect(task, SIGNAL(si_stateChanged()), SLOT(sl_onTaskFinishied()));
381 if (listeners[0] != nullptr) {
382 listeners[0]->setToolName(cfg->name);
383 }
384 return task;
385 }
386
finishWorkIfInputEnded(QString & error)387 bool ExternalProcessWorker::finishWorkIfInputEnded(QString &error) {
388 error.clear();
389 const InputsCheckResult checkResult = checkInputBusState();
390 switch (checkResult) {
391 case ALL_INPUTS_FINISH:
392 finish();
393 return true;
394 case SOME_INPUTS_FINISH:
395 error = tr("Some inputs are finished while other still have not processed messages");
396 finish();
397 return true;
398 case ALL_INPUTS_HAVE_MESSAGE:
399 return false;
400 case INTERNAL_ERROR:
401 error = tr("An internal error has been spotted");
402 finish();
403 return true;
404 case NOT_ALL_INPUTS_HAVE_MESSAGE:
405 return false;
406 default:
407 error = tr("Unexpected result");
408 finish();
409 return true;
410 }
411 }
412
finish()413 void ExternalProcessWorker::finish() {
414 setDone();
415 if (nullptr != output) {
416 output->setEnded();
417 }
418 }
419
420 namespace {
getObject(Document * d,GObjectType t,U2OpStatus & os)421 static GObject *getObject(Document *d, GObjectType t, U2OpStatus &os) {
422 QList<GObject *> objs = d->findGObjectByType(t, UOF_LoadedAndUnloaded);
423 if (objs.isEmpty()) {
424 os.setError(QObject::tr("No target objects in the file: %1").arg(d->getURLString()));
425 return nullptr;
426 }
427 return objs.first();
428 }
429
getAlignment(Document * d,WorkflowContext * context,U2OpStatus & os)430 static SharedDbiDataHandler getAlignment(Document *d, WorkflowContext *context, U2OpStatus &os) {
431 GObject *obj = getObject(d, GObjectTypes::MULTIPLE_SEQUENCE_ALIGNMENT, os);
432 CHECK_OP(os, SharedDbiDataHandler());
433
434 MultipleSequenceAlignmentObject *msaObj = static_cast<MultipleSequenceAlignmentObject *>(obj);
435 if (nullptr == msaObj) {
436 os.setError(QObject::tr("Error with alignment object"));
437 return SharedDbiDataHandler();
438 }
439 return context->getDataStorage()->getDataHandler(msaObj->getEntityRef());
440 }
441
getAnnotations(Document * d,WorkflowContext * context,U2OpStatus & os)442 static SharedDbiDataHandler getAnnotations(Document *d, WorkflowContext *context, U2OpStatus &os) {
443 GObject *obj = getObject(d, GObjectTypes::ANNOTATION_TABLE, os);
444 CHECK_OP(os, SharedDbiDataHandler());
445
446 AnnotationTableObject *annsObj = static_cast<AnnotationTableObject *>(obj);
447 if (nullptr == annsObj) {
448 os.setError(QObject::tr("Error with annotations object"));
449 return SharedDbiDataHandler();
450 }
451 return context->getDataStorage()->getDataHandler(annsObj->getEntityRef());
452 }
453
454 } // namespace
455
sl_onTaskFinishied()456 void ExternalProcessWorker::sl_onTaskFinishied() {
457 LaunchExternalToolTask *t = qobject_cast<LaunchExternalToolTask *>(sender());
458 CHECK(t->isFinished(), );
459
460 if (inputs.isEmpty()) {
461 finish();
462 }
463
464 CHECK(!t->hasError(), );
465
466 foreach (const QString &url, urlsForDashboard.keys()) {
467 QFileInfo fileInfo(url);
468 if (fileInfo.exists()) {
469 if (fileInfo.isFile()) {
470 monitor()->addOutputFile(url, getActorId(), urlsForDashboard.value(url));
471 } else if (fileInfo.isDir()) {
472 monitor()->addOutputFolder(url, getActorId());
473 }
474 }
475 }
476
477 CHECK(output != nullptr, );
478
479 /* This variable and corresponded code parts with it
480 * are temporary created for merging sequences.
481 * When standard multiplexing/merging tools will be created
482 * then the variable and code parts must be deleted.
483 */
484 QMap<QString, QList<U2EntityRef>> seqsForMergingBySlotId;
485 QMap<QString, DataConfig> outputUrls = t->takeOutputUrls();
486 QMap<QString, DataConfig>::iterator i = outputUrls.begin();
487 QVariantMap v;
488
489 for (; i != outputUrls.end(); i++) {
490 DataConfig cfg = i.value();
491 QString url = i.key();
492
493 if (cfg.isFileUrl()) {
494 if (QFile::exists(url)) {
495 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
496 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = url;
497 context->addExternalProcessFile(url);
498 } else {
499 reportError(tr("%1 file was not created").arg(url));
500 }
501 } else {
502 U2OpStatusImpl os;
503 QScopedPointer<Document> d(loadDocument(url, cfg, context, os));
504 CHECK_OP_EXT(os, reportError(os.getError()), );
505 d->setDocumentOwnsDbiResources(false);
506
507 if (cfg.isSequence()) {
508 QList<GObject *> seqObjects = d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded);
509 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
510 QString slotId = WorkflowUtils::getSlotDescOfDatatype(dataType).getId();
511 if (seqObjects.size() == 1) {
512 GObject *obj = seqObjects.first();
513 Workflow::SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(obj->getEntityRef());
514 v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
515 } else if (1 < seqObjects.size()) {
516 QList<U2EntityRef> refs;
517 foreach (GObject *obj, seqObjects) {
518 refs << obj->getEntityRef();
519 }
520 seqsForMergingBySlotId.insert(slotId, refs);
521 }
522 } else if (cfg.isAlignment()) {
523 SharedDbiDataHandler msaId = getAlignment(d.data(), context, os);
524 CHECK_OP_EXT(os, reportError(os.getError()), );
525 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
526 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<SharedDbiDataHandler>(msaId);
527 } else if (cfg.isAnnotations()) {
528 const SharedDbiDataHandler annTableId = getAnnotations(d.data(), context, os);
529 CHECK_OP_EXT(os, reportError(os.getError()), );
530 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
531 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<SharedDbiDataHandler>(annTableId);
532 } else if (cfg.isAnnotatedSequence()) {
533 if (!d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded).isEmpty()) {
534 U2SequenceObject *seqObj = static_cast<U2SequenceObject *>(d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded).first());
535 DNASequence seq = seqObj->getWholeSequence(os);
536 CHECK_OP_EXT(os, reportError(os.getError()), );
537 seq.alphabet = U2AlphabetUtils::getById(BaseDNAAlphabetIds::RAW());
538 SharedDbiDataHandler seqId = context->getDataStorage()->putSequence(seq);
539 v[BaseSlots::DNA_SEQUENCE_SLOT().getId()] = qVariantFromValue<SharedDbiDataHandler>(seqId);
540 }
541 const SharedDbiDataHandler annTableId = getAnnotations(d.data(), context, os);
542 if (!os.hasError()) {
543 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
544 v[BaseSlots::ANNOTATION_TABLE_SLOT().getId()] = qVariantFromValue<SharedDbiDataHandler>(annTableId);
545 }
546 } else if (cfg.isText()) {
547 if (!d->findGObjectByType(GObjectTypes::TEXT, UOF_LoadedAndUnloaded).isEmpty()) {
548 TextObject *obj = static_cast<TextObject *>(d->findGObjectByType(GObjectTypes::TEXT, UOF_LoadedAndUnloaded).first());
549 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
550 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<QString>(obj->getText());
551 }
552 }
553
554 QFile::remove(url);
555 }
556 }
557
558 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(OUTPUT_PORT_TYPE + cfg->id);
559
560 if (seqsForMergingBySlotId.isEmpty()) {
561 output->put(Message(dataType, v));
562 } else if (seqsForMergingBySlotId.size() == 1) {
563 // create a message for every sequence
564 QString slotId = seqsForMergingBySlotId.keys().first();
565 const QList<U2EntityRef> &refs = seqsForMergingBySlotId.value(slotId);
566 foreach (const U2EntityRef &eRef, refs) {
567 SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(eRef);
568 v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
569 output->put(Message(dataType, v));
570 }
571 } else {
572 // merge every sequence group and send one message
573 U2SequenceImporter seqImporter = U2SequenceImporter(QVariantMap());
574 U2OpStatus2Log os;
575
576 foreach (const QString &slotId, seqsForMergingBySlotId.keys()) {
577 const QList<U2EntityRef> &refs = seqsForMergingBySlotId.value(slotId);
578 bool first = true;
579 foreach (const U2EntityRef &eRef, refs) {
580 QScopedPointer<U2SequenceObject> obj(new U2SequenceObject("tmp_name", eRef));
581 if (first) {
582 seqImporter.startSequence(os, context->getDataStorage()->getDbiRef(), U2ObjectDbi::ROOT_FOLDER, slotId, false);
583 first = false;
584 }
585 U2Region wholeSeq(0, obj->getSequenceLength());
586 seqImporter.addSequenceBlock(eRef, wholeSeq, os);
587 }
588 U2Sequence seq = seqImporter.finalizeSequenceAndValidate(os);
589 U2EntityRef eRef(context->getDataStorage()->getDbiRef(), seq.id);
590 SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(eRef);
591 v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
592 }
593 CHECK_OP(os, );
594 output->put(Message(dataType, v));
595 }
596 }
597
init()598 void ExternalProcessWorker::init() {
599 commandLine = cfg->cmdLine;
600 applySpecialInternalEnvvars(commandLine, cfg);
601 applyAttributes(commandLine);
602
603 output = ports.value(OUT_PORT_ID);
604
605 foreach (const DataConfig &input, cfg->inputs) {
606 IntegralBus *inBus = ports.value(input.attributeId);
607 inputs << inBus;
608
609 inBus->addComplement(output);
610 }
611 }
612
checkInputBusState() const613 ExternalProcessWorker::InputsCheckResult ExternalProcessWorker::checkInputBusState() const {
614 const int inputsCount = inputs.count();
615 CHECK(0 < inputsCount, ALL_INPUTS_FINISH);
616
617 int inputsWithMessagesCount = 0;
618 int finishedInputs = 0;
619 foreach (const CommunicationChannel *ch, inputs) {
620 SAFE_POINT(nullptr != ch, "Input is nullptr", INTERNAL_ERROR);
621 if (0 != ch->hasMessage()) {
622 ++inputsWithMessagesCount;
623 }
624 if (ch->isEnded()) {
625 ++finishedInputs;
626 }
627 }
628
629 if (inputsCount == inputsWithMessagesCount) {
630 return ALL_INPUTS_HAVE_MESSAGE;
631 } else if (inputsCount == finishedInputs) {
632 return ALL_INPUTS_FINISH;
633 } else if (0 < finishedInputs && 0 < inputsWithMessagesCount) {
634 return SOME_INPUTS_FINISH;
635 } else {
636 return NOT_ALL_INPUTS_HAVE_MESSAGE;
637 }
638 }
639
isReady() const640 bool ExternalProcessWorker::isReady() const {
641 CHECK(!isDone(), false);
642 if (inputs.isEmpty()) {
643 return true;
644 } else {
645 const InputsCheckResult checkResult = checkInputBusState();
646 switch (checkResult) {
647 case ALL_INPUTS_FINISH:
648 case SOME_INPUTS_FINISH:
649 case ALL_INPUTS_HAVE_MESSAGE:
650 case INTERNAL_ERROR:
651 return true; // the worker will be marked as 'done' in the 'tick' method
652 case NOT_ALL_INPUTS_HAVE_MESSAGE:
653 return false;
654 }
655 }
656 return false;
657 }
658
cleanup()659 void ExternalProcessWorker::cleanup() {
660 foreach (const QString &url, inputUrls) {
661 if (QFile::exists(url)) {
662 QFile::remove(url);
663 }
664 }
665 }
666
667 /************************************************************************/
668 /* LaunchExternalToolTask */
669 /************************************************************************/
LaunchExternalToolTask(const QString & _execString,const QString & _workingDir,const QMap<QString,DataConfig> & _outputUrls)670 LaunchExternalToolTask::LaunchExternalToolTask(const QString &_execString, const QString &_workingDir, const QMap<QString, DataConfig> &_outputUrls)
671 : Task(tr("Launch external process task"), TaskFlag_None), outputUrls(_outputUrls), execString(_execString), workingDir(_workingDir) {
672 }
673
~LaunchExternalToolTask()674 LaunchExternalToolTask::~LaunchExternalToolTask() {
675 foreach (const QString &url, outputUrls.keys()) {
676 if (QFile::exists(url)) {
677 QFile::remove(url);
678 }
679 }
680 }
681
682 #define WIN_LAUNCH_CMD_COMMAND "cmd /C "
683 #define START_WAIT_MSEC 3000
684
run()685 void LaunchExternalToolTask::run() {
686 GCOUNTER(cvar, "A task for an element with external tool is launched");
687 QProcess *externalProcess = new QProcess();
688 externalProcess->setWorkingDirectory(workingDir);
689 if (execString.contains(">")) {
690 QString output = execString.split(">").last();
691 output = output.trimmed();
692 if (output.startsWith('\"')) {
693 output = output.mid(1, output.length() - 2);
694 }
695 execString = execString.split(">").first();
696 externalProcess->setStandardOutputFile(output);
697 }
698 QScopedPointer<CustomExternalToolLogParser> logParser(new CustomExternalToolLogParser());
699 QScopedPointer<ExternalToolRunTaskHelper> helper(new CustomExternalToolRunTaskHelper(externalProcess, logParser.data(), stateInfo));
700 CHECK(listeners.size() > 0, );
701 helper->addOutputListener(listeners[0]);
702 QStringList execStringArgs = ExternalToolSupportUtils::splitCmdLineArguments(execString);
703 QString execStringProg = execStringArgs.takeAt(0);
704
705 QProcessEnvironment env = QProcessEnvironment::systemEnvironment();
706 externalProcess->setProcessEnvironment(env);
707 taskLog.details(tr("Running external process: %1").arg(execString));
708 bool startOk = WorkflowUtils::startExternalProcess(externalProcess, execStringProg, execStringArgs);
709
710 if (!startOk) {
711 stateInfo.setError(tr("Can't launch %1").arg(execString));
712 return;
713 }
714 listeners[0]->addNewLogMessage(execString, ExternalToolListener::PROGRAM_WITH_ARGUMENTS);
715 while (!externalProcess->waitForFinished(1000)) {
716 if (isCanceled()) {
717 CmdlineTaskRunner::killProcessTree(externalProcess);
718 }
719 }
720
721 QProcess::ExitStatus status = externalProcess->exitStatus();
722 int exitCode = externalProcess->exitCode();
723 if (status == QProcess::CrashExit && !hasError()) {
724 setError(tr("External process %1 exited with the following error: %2 (Code: %3)")
725 .arg(execString)
726 .arg(externalProcess->errorString())
727 .arg(exitCode));
728 } else if (status == QProcess::NormalExit && exitCode != EXIT_SUCCESS && !hasError()) {
729 setError(tr("External process %1 exited with code %2").arg(execString).arg(exitCode));
730 } else if (status == QProcess::NormalExit && exitCode == EXIT_SUCCESS && !hasError()) {
731 algoLog.details(tr("External process \"%1\" finished successfully").arg(execString));
732 }
733 }
734
takeOutputUrls()735 QMap<QString, DataConfig> LaunchExternalToolTask::takeOutputUrls() {
736 QMap<QString, DataConfig> result = outputUrls;
737 outputUrls.clear();
738 return result;
739 }
740
addListeners(const QList<ExternalToolListener * > & listenersToAdd)741 void LaunchExternalToolTask::addListeners(const QList<ExternalToolListener *> &listenersToAdd) {
742 listeners.append(listenersToAdd);
743 }
744
745 /************************************************************************/
746 /* ExternalProcessWorkerPrompter */
747 /************************************************************************/
composeRichDoc()748 QString ExternalProcessWorkerPrompter::composeRichDoc() {
749 ExternalProcessConfig *cfg = WorkflowEnv::getExternalCfgRegistry()->getConfigById(target->getProto()->getId());
750 assert(cfg);
751 QString doc(cfg->templateDescription);
752 doc.replace("\n", "<br>");
753
754 foreach (const DataConfig &dataCfg, cfg->inputs) {
755 QRegExp param(QString("\\$%1[^%2]|$").arg(dataCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
756 if (doc.contains(param)) {
757 IntegralBusPort *input = qobject_cast<IntegralBusPort *>(target->getPort(dataCfg.attributeId));
758 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(dataCfg.type);
759 if (dataCfg.type == SEQ_WITH_ANNS) {
760 dataType = BaseTypes::DNA_SEQUENCE_TYPE();
761 }
762 Actor *producer = input->getProducer(WorkflowUtils::getSlotDescOfDatatype(dataType).getId());
763 QString unsetStr = "<font color='red'>" + tr("unset") + "</font>";
764 QString producerName = tr("<u>%1</u>").arg(producer ? producer->getLabel() : unsetStr);
765 doc.replace("$" + dataCfg.attributeId, producerName);
766 }
767 }
768
769 foreach (const DataConfig &dataCfg, cfg->outputs) {
770 QRegExp param(QString("\\$%1[^%2]|$").arg(dataCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
771 if (doc.contains(param)) {
772 IntegralBusPort *output = qobject_cast<IntegralBusPort *>(target->getPort(OUT_PORT_ID));
773 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(dataCfg.type);
774 if (dataCfg.type == SEQ_WITH_ANNS) {
775 dataType = BaseTypes::DNA_SEQUENCE_TYPE();
776 }
777 QString destinations;
778 QString unsetStr = "<font color='red'>" + tr("unset") + "</font>";
779 if (!output->getLinks().isEmpty()) {
780 foreach (Port *p, output->getLinks().keys()) {
781 IntegralBusPort *ibp = qobject_cast<IntegralBusPort *>(p);
782 Actor *dest = ibp->owner();
783 destinations += tr("<u>%1</u>").arg(dest ? dest->getLabel() : unsetStr) + ",";
784 }
785 }
786 if (destinations.isEmpty()) {
787 destinations = tr("<u>%1</u>").arg(unsetStr);
788 } else {
789 destinations.resize(destinations.size() - 1); // remove last semicolon
790 }
791 doc.replace("$" + dataCfg.attributeId, destinations);
792 }
793 }
794
795 foreach (const AttributeConfig &attrCfg, cfg->attrs) {
796 QRegExp param(QString("\\$%1([^%2]|$)").arg(attrCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
797 if (doc.contains(param)) {
798 QString prm = getRequiredParam(attrCfg.attributeId);
799 doc.replace("$" + attrCfg.attributeId, getHyperlink(attrCfg.attrName, prm));
800 }
801 }
802
803 return doc;
804 }
805
806 } // namespace LocalWorkflow
807 } // namespace U2
808