1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "ExternalProcessWorker.h"
23 
24 #include <QDateTime>
25 #include <QFile>
26 #include <QUuid>
27 
28 #include <U2Core/AnnotationTableObject.h>
29 #include <U2Core/AppContext.h>
30 #include <U2Core/AppSettings.h>
31 #include <U2Core/CmdlineTaskRunner.h>
32 #include <U2Core/DNASequenceObject.h>
33 #include <U2Core/DocumentModel.h>
34 #include <U2Core/ExternalToolRegistry.h>
35 #include <U2Core/ExternalToolRunTask.h>
36 #include <U2Core/FailTask.h>
37 #include <U2Core/FileAndDirectoryUtils.h>
38 #include <U2Core/GObjectRelationRoles.h>
39 #include <U2Core/GUrlUtils.h>
40 #include <U2Core/IOAdapter.h>
41 #include <U2Core/MultipleSequenceAlignmentImporter.h>
42 #include <U2Core/MultipleSequenceAlignmentObject.h>
43 #include <U2Core/TextObject.h>
44 #include <U2Core/U2AlphabetUtils.h>
45 #include <U2Core/U2OpStatusUtils.h>
46 #include <U2Core/U2SequenceUtils.h>
47 #include <U2Core/UserApplicationsSettings.h>
48 
49 #include <U2Designer/DelegateEditors.h>
50 
51 #include <U2Lang/ActorPrototypeRegistry.h>
52 #include <U2Lang/BaseActorCategories.h>
53 #include <U2Lang/BaseSlots.h>
54 #include <U2Lang/BaseTypes.h>
55 #include <U2Lang/ExternalToolCfg.h>
56 #include <U2Lang/IncludedProtoFactory.h>
57 #include <U2Lang/WorkflowEnv.h>
58 #include <U2Lang/WorkflowMonitor.h>
59 
60 #include "CustomExternalToolLogParser.h"
61 #include "CustomExternalToolRunTaskHelper.h"
62 #include "util/CustomWorkerUtils.h"
63 
64 namespace U2 {
65 namespace LocalWorkflow {
66 
67 const static QString INPUT_PORT_TYPE("input-for-");
68 const static QString OUTPUT_PORT_TYPE("output-for-");
69 static const QString OUT_PORT_ID("out");
70 
init(ExternalProcessConfig * cfg)71 bool ExternalProcessWorkerFactory::init(ExternalProcessConfig *cfg) {
72     QScopedPointer<ActorPrototype> proto(IncludedProtoFactory::getExternalToolProto(cfg));
73     const bool prototypeRegistered = WorkflowEnv::getProtoRegistry()->registerProto(BaseActorCategories::CATEGORY_EXTERNAL(), proto.data());
74     CHECK(prototypeRegistered, false);
75     proto.take();
76 
77     const bool factoryRegistered = IncludedProtoFactory::registerExternalToolWorker(cfg);
78     CHECK_EXT(factoryRegistered, delete WorkflowEnv::getProtoRegistry()->unregisterProto(cfg->id), false);
79 
80     return true;
81 }
82 
83 namespace {
toStringValue(const QVariantMap & data,U2OpStatus & os)84 static QString toStringValue(const QVariantMap &data, U2OpStatus &os) {
85     QString slot = BaseSlots::TEXT_SLOT().getId();
86     if (!data.contains(slot)) {
87         os.setError(QObject::tr("Empty text slot"));
88         return "";
89     }
90     return data[slot].value<QString>();
91 }
92 
toSequence(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)93 static U2SequenceObject *toSequence(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
94     QString slot = BaseSlots::DNA_SEQUENCE_SLOT().getId();
95     if (!data.contains(slot)) {
96         os.setError(QObject::tr("Empty sequence slot"));
97         return nullptr;
98     }
99     SharedDbiDataHandler seqId = data[slot].value<SharedDbiDataHandler>();
100     U2SequenceObject *seqObj = StorageUtils::getSequenceObject(context->getDataStorage(), seqId);
101     if (nullptr == seqObj) {
102         os.setError(QObject::tr("Error with sequence object"));
103     }
104     return seqObj;
105 }
106 
toAnotations(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)107 static AnnotationTableObject *toAnotations(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
108     QString slot = BaseSlots::ANNOTATION_TABLE_SLOT().getId();
109     if (!data.contains(slot)) {
110         os.setError(QObject::tr("Empty annotations slot"));
111         return nullptr;
112     }
113     const QVariant annotationsData = data[slot];
114     const QList<SharedAnnotationData> annList = StorageUtils::getAnnotationTable(context->getDataStorage(), annotationsData);
115 
116     AnnotationTableObject *annsObj = new AnnotationTableObject("Annotations", context->getDataStorage()->getDbiRef());
117     annsObj->addAnnotations(annList);
118 
119     return annsObj;
120 }
121 
toAlignment(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)122 static MultipleSequenceAlignmentObject *toAlignment(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
123     QString slot = BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId();
124     if (!data.contains(slot)) {
125         os.setError(QObject::tr("Empty alignment slot"));
126         return nullptr;
127     }
128     SharedDbiDataHandler msaId = data[slot].value<SharedDbiDataHandler>();
129     MultipleSequenceAlignmentObject *msaObj = StorageUtils::getMsaObject(context->getDataStorage(), msaId);
130     if (nullptr == msaObj) {
131         os.setError(QObject::tr("Error with alignment object"));
132     }
133     return msaObj;
134 }
135 
toText(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)136 static TextObject *toText(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
137     QString slot = BaseSlots::TEXT_SLOT().getId();
138     if (!data.contains(slot)) {
139         os.setError(QObject::tr("Empty text slot"));
140         return nullptr;
141     }
142     QString text = data[slot].value<QString>();
143     return TextObject::createInstance(text, "tmp_text_object", context->getDataStorage()->getDbiRef(), os);
144 }
145 
generateAndCreateURL(const QString & extention,const QString & name)146 static QString generateAndCreateURL(const QString &extention, const QString &name) {
147     QString url;
148     QString path = AppContext::getAppSettings()->getUserAppsSettings()->getCurrentProcessTemporaryDirPath("wd_external");
149     QDir dir(path);
150     if (!dir.exists()) {
151         dir.mkpath(path);
152     }
153     url = path + "/tmp" + GUrlUtils::fixFileName(name + "_" + QUuid::createUuid().toString()) + "." + extention;
154     return url;
155 }
156 
getFormat(const DataConfig & dataCfg,U2OpStatus & os)157 static DocumentFormat *getFormat(const DataConfig &dataCfg, U2OpStatus &os) {
158     DocumentFormat *f = AppContext::getDocumentFormatRegistry()->getFormatById(dataCfg.format);
159     if (nullptr == f) {
160         os.setError(QObject::tr("Unknown document format: %1").arg(dataCfg.format));
161     }
162     return f;
163 }
164 
createDocument(const DataConfig & dataCfg,U2OpStatus & os)165 static Document *createDocument(const DataConfig &dataCfg, U2OpStatus &os) {
166     DocumentFormat *f = getFormat(dataCfg, os);
167     CHECK_OP(os, nullptr);
168 
169     IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(BaseIOAdapters::LOCAL_FILE);
170     QString url = generateAndCreateURL(f->getSupportedDocumentFileExtensions().first(), dataCfg.attrName);
171     QScopedPointer<Document> d(f->createNewLoadedDocument(iof, url, os));
172     CHECK_OP(os, nullptr);
173     d->setDocumentOwnsDbiResources(false);
174     return d.take();
175 }
176 
loadDocument(const QString & url,const DataConfig & dataCfg,WorkflowContext * context,U2OpStatus & os)177 static Document *loadDocument(const QString &url, const DataConfig &dataCfg, WorkflowContext *context, U2OpStatus &os) {
178     DocumentFormat *f = getFormat(dataCfg, os);
179     CHECK_OP(os, nullptr);
180 
181     IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(BaseIOAdapters::LOCAL_FILE);
182     QVariantMap hints;
183     U2DbiRef dbiRef = context->getDataStorage()->getDbiRef();
184     hints.insert(DocumentFormat::DBI_REF_HINT, qVariantFromValue(dbiRef));
185     QScopedPointer<Document> d(f->loadDocument(iof, url, hints, os));
186     CHECK_OP(os, nullptr);
187     d->setDocumentOwnsDbiResources(false);
188     return d.take();
189 }
190 
addObjects(Document * d,WorkflowContext * context,const DataConfig & dataCfg,const QVariantMap & data,U2OpStatus & os)191 static void addObjects(Document *d, WorkflowContext *context, const DataConfig &dataCfg, const QVariantMap &data, U2OpStatus &os) {
192     if (dataCfg.isSequence()) {
193         U2SequenceObject *seqObj = toSequence(data, context, os);
194         CHECK_OP(os, );
195         d->addObject(seqObj);
196     } else if (dataCfg.isAnnotations()) {
197         AnnotationTableObject *annsObj = toAnotations(data, context, os);
198         CHECK_OP(os, );
199         d->addObject(annsObj);
200     } else if (dataCfg.isAlignment()) {
201         MultipleSequenceAlignmentObject *msaObj = toAlignment(data, context, os);
202         CHECK_OP(os, );
203         d->addObject(msaObj);
204     } else if (dataCfg.isAnnotatedSequence()) {
205         U2SequenceObject *seqObj = toSequence(data, context, os);
206         CHECK_OP(os, );
207         d->addObject(seqObj);
208         AnnotationTableObject *annsObj = toAnotations(data, context, os);
209         CHECK_OP(os, );
210         d->addObject(annsObj);
211 
212         QList<GObjectRelation> rel;
213         rel << GObjectRelation(GObjectReference(seqObj), ObjectRole_Sequence);
214         annsObj->setObjectRelations(rel);
215     } else if (dataCfg.isText()) {
216         TextObject *textObj = toText(data, context, os);
217         CHECK_OP(os, );
218         d->addObject(textObj);
219     }
220 }
221 }  // namespace
222 
ExternalProcessWorker(Actor * a)223 ExternalProcessWorker::ExternalProcessWorker(Actor *a)
224     : BaseWorker(a, false),
225       output(nullptr) {
226     ExternalToolCfgRegistry *reg = WorkflowEnv::getExternalCfgRegistry();
227     cfg = reg->getConfigById(actor->getProto()->getId());
228 }
229 
applySpecialInternalEnvvars(QString & execString,ExternalProcessConfig * cfg)230 void ExternalProcessWorker::applySpecialInternalEnvvars(QString &execString,
231                                                         ExternalProcessConfig *cfg) {
232     CustomWorkerUtils::commandReplaceAllSpecialByUgenePath(execString, cfg);
233 }
234 
applyAttributes(QString & execString)235 void ExternalProcessWorker::applyAttributes(QString &execString) {
236     foreach (Attribute *a, actor->getAttributes()) {
237         QString attrValue = a->getAttributePureValue().toString();
238         DataTypePtr attrType = a->getAttributeType();
239         if (attrType == BaseTypes::STRING_TYPE()) {
240             attrValue = GUrlUtils::getQuotedString(attrValue);
241         }
242         bool wasReplaced = applyParamsToExecString(execString,
243                                                    a->getId(),
244                                                    attrValue);
245 
246         if (wasReplaced) {
247             foreach (const AttributeConfig &attributeConfig, cfg->attrs) {
248                 if (attributeConfig.attributeId == a->getId() && attributeConfig.flags.testFlag(AttributeConfig::AddToDashboard)) {
249                     urlsForDashboard.insert(attrValue,
250                                             !attributeConfig.flags.testFlag(AttributeConfig::OpenWithUgene));
251                     break;
252                 }
253             }
254         }
255     }
256 }
257 
applyParamsToExecString(QString & execString,QString parName,QString parValue)258 bool ExternalProcessWorker::applyParamsToExecString(QString &execString, QString parName, QString parValue) {
259     QRegularExpression regex = QRegularExpression(QString("((([^\\\\])|([^\\\\](\\\\\\\\)+)|(^))\\$)") + QString("(") + parName + QString(")") + (QString("(?=([^") + WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE + QString("]|$))")));
260     bool wasReplaced = false;
261 
262     // Replace the params one-by-one
263     QRegularExpressionMatchIterator iter = regex.globalMatch(execString);
264     while (iter.hasNext()) {
265         QRegularExpressionMatch match = iter.next();
266         if (match.hasMatch()) {
267             QString m1 = match.captured(1);
268             int start = match.capturedStart(0);
269             int len = match.capturedLength();
270             execString.replace(start + m1.length() - 1, len - m1.length() + 1, parValue);
271             wasReplaced = true;
272 
273             // We need to re-iterate as the string was changed
274             iter = regex.globalMatch(execString);
275         }
276     }
277 
278     return wasReplaced;
279 }
280 
applyEscapedSymbols(QString & execString)281 void ExternalProcessWorker::applyEscapedSymbols(QString &execString) {
282     // Replace escaped symbols
283     // Example:
284     // "%USUPP_JAVA% \\%USUPP_JAVA% -version \\\$\%\\\\\%\\$"   ─┐
285     // "/usr/bin/java \/usr/bin/java -version $%\\%\$"         <─┘
286     execString.replace(QRegularExpression("\\\\([\\\\\\%\\$])"), "\\1");
287 }
288 
applyInputMessage(QString & execString,const DataConfig & dataCfg,const QVariantMap & data,U2OpStatus & os)289 QStringList ExternalProcessWorker::applyInputMessage(QString &execString, const DataConfig &dataCfg, const QVariantMap &data, U2OpStatus &os) {
290     QStringList urls;
291     QString paramValue;
292 
293     if (dataCfg.isStringValue()) {
294         paramValue = GUrlUtils::getQuotedString(toStringValue(data, os));
295         CHECK_OP(os, urls);
296     } else {
297         QScopedPointer<Document> d(createDocument(dataCfg, os));
298         CHECK_OP(os, urls);
299         addObjects(d.data(), context, dataCfg, data, os);
300         CHECK_OP(os, urls);
301 
302         DocumentFormat *f = getFormat(dataCfg, os);
303         CHECK_OP(os, urls);
304         f->storeDocument(d.data(), os);
305         CHECK_OP(os, urls);
306         urls << d->getURLString();
307         paramValue = GUrlUtils::getQuotedString(d->getURLString());
308     }
309 
310     applyParamsToExecString(execString, dataCfg.attributeId, paramValue);
311     return urls;
312 }
313 
prepareOutput(QString & execString,const DataConfig & dataCfg,U2OpStatus & os)314 QString ExternalProcessWorker::prepareOutput(QString &execString, const DataConfig &dataCfg, U2OpStatus &os) {
315     QString extension;
316 
317     if (dataCfg.isFileUrl()) {
318         extension = "tmp";
319     } else {
320         DocumentFormat *f = getFormat(dataCfg, os);
321         CHECK_OP(os, "")
322         extension = f->getSupportedDocumentFileExtensions().first();
323     }
324     QString url = generateAndCreateURL(extension, dataCfg.attrName);
325     bool replaced = applyParamsToExecString(execString, dataCfg.attributeId, GUrlUtils::getQuotedString(url));
326     CHECK(replaced, "")
327 
328     return url;
329 }
330 
tick()331 Task *ExternalProcessWorker::tick() {
332     QString error;
333     if (!inputs.isEmpty() && finishWorkIfInputEnded(error)) {
334         if (!error.isEmpty()) {
335             return new FailTask(error);
336         } else {
337             return nullptr;
338         }
339     }
340 
341     QString execString = commandLine;
342 
343     int i = 0;
344     foreach (const DataConfig &dataCfg, cfg->inputs) {  // write all input data to files
345         Message inputMessage = getMessageAndSetupScriptValues(inputs[i]);
346         i++;
347         QVariantMap data = inputMessage.getData().toMap();
348         U2OpStatusImpl os;
349         inputUrls << applyInputMessage(execString, dataCfg, data, os);
350         CHECK_OP(os, new FailTask(os.getError()));
351     }
352 
353     QMap<QString, DataConfig> outputUrls;
354     foreach (const DataConfig &dataCfg, cfg->outputs) {
355         U2OpStatusImpl os;
356         QString url = prepareOutput(execString, dataCfg, os);
357         CHECK_OP(os, new FailTask(os.getError()));
358         if (!url.isEmpty()) {
359             outputUrls[url] = dataCfg;
360         }
361     }
362 
363     // The following call must be last call in the preparing execString chain
364     // So, this is a very last step for execString:
365     //     1) function init(): the first one is substitution of the internal vars (like '%...%')
366     //     2) function init(): the second is applying attributes (something like '$...')
367     //     3) this function: apply substitutions for Input/Output
368     //     4) this function: this call replaces escaped symbols: '\$', '\%', '\\' by the '$', '%', '\'
369     applyEscapedSymbols(execString);
370 
371     const QString workingDirectory = FileAndDirectoryUtils::createWorkingDir(context->workingDir(), FileAndDirectoryUtils::WORKFLOW_INTERNAL, "", context->workingDir());
372     QString externalProcessFolder = GUrlUtils::fixFileName(cfg->name).replace(' ', '_');
373     U2OpStatusImpl os;
374     const QString externalProcessWorkingDir = GUrlUtils::createDirectory(workingDirectory + externalProcessFolder, "_", os);
375     CHECK_OP(os, new FailTask(os.getError()));
376 
377     LaunchExternalToolTask *task = new LaunchExternalToolTask(execString, externalProcessWorkingDir, outputUrls);
378     QList<ExternalToolListener *> listeners(createLogListeners());
379     task->addListeners(listeners);
380     connect(task, SIGNAL(si_stateChanged()), SLOT(sl_onTaskFinishied()));
381     if (listeners[0] != nullptr) {
382         listeners[0]->setToolName(cfg->name);
383     }
384     return task;
385 }
386 
finishWorkIfInputEnded(QString & error)387 bool ExternalProcessWorker::finishWorkIfInputEnded(QString &error) {
388     error.clear();
389     const InputsCheckResult checkResult = checkInputBusState();
390     switch (checkResult) {
391         case ALL_INPUTS_FINISH:
392             finish();
393             return true;
394         case SOME_INPUTS_FINISH:
395             error = tr("Some inputs are finished while other still have not processed messages");
396             finish();
397             return true;
398         case ALL_INPUTS_HAVE_MESSAGE:
399             return false;
400         case INTERNAL_ERROR:
401             error = tr("An internal error has been spotted");
402             finish();
403             return true;
404         case NOT_ALL_INPUTS_HAVE_MESSAGE:
405             return false;
406         default:
407             error = tr("Unexpected result");
408             finish();
409             return true;
410     }
411 }
412 
finish()413 void ExternalProcessWorker::finish() {
414     setDone();
415     if (nullptr != output) {
416         output->setEnded();
417     }
418 }
419 
420 namespace {
getObject(Document * d,GObjectType t,U2OpStatus & os)421 static GObject *getObject(Document *d, GObjectType t, U2OpStatus &os) {
422     QList<GObject *> objs = d->findGObjectByType(t, UOF_LoadedAndUnloaded);
423     if (objs.isEmpty()) {
424         os.setError(QObject::tr("No target objects in the file: %1").arg(d->getURLString()));
425         return nullptr;
426     }
427     return objs.first();
428 }
429 
getAlignment(Document * d,WorkflowContext * context,U2OpStatus & os)430 static SharedDbiDataHandler getAlignment(Document *d, WorkflowContext *context, U2OpStatus &os) {
431     GObject *obj = getObject(d, GObjectTypes::MULTIPLE_SEQUENCE_ALIGNMENT, os);
432     CHECK_OP(os, SharedDbiDataHandler());
433 
434     MultipleSequenceAlignmentObject *msaObj = static_cast<MultipleSequenceAlignmentObject *>(obj);
435     if (nullptr == msaObj) {
436         os.setError(QObject::tr("Error with alignment object"));
437         return SharedDbiDataHandler();
438     }
439     return context->getDataStorage()->getDataHandler(msaObj->getEntityRef());
440 }
441 
getAnnotations(Document * d,WorkflowContext * context,U2OpStatus & os)442 static SharedDbiDataHandler getAnnotations(Document *d, WorkflowContext *context, U2OpStatus &os) {
443     GObject *obj = getObject(d, GObjectTypes::ANNOTATION_TABLE, os);
444     CHECK_OP(os, SharedDbiDataHandler());
445 
446     AnnotationTableObject *annsObj = static_cast<AnnotationTableObject *>(obj);
447     if (nullptr == annsObj) {
448         os.setError(QObject::tr("Error with annotations object"));
449         return SharedDbiDataHandler();
450     }
451     return context->getDataStorage()->getDataHandler(annsObj->getEntityRef());
452 }
453 
454 }  // namespace
455 
sl_onTaskFinishied()456 void ExternalProcessWorker::sl_onTaskFinishied() {
457     LaunchExternalToolTask *t = qobject_cast<LaunchExternalToolTask *>(sender());
458     CHECK(t->isFinished(), );
459 
460     if (inputs.isEmpty()) {
461         finish();
462     }
463 
464     CHECK(!t->hasError(), );
465 
466     foreach (const QString &url, urlsForDashboard.keys()) {
467         QFileInfo fileInfo(url);
468         if (fileInfo.exists()) {
469             if (fileInfo.isFile()) {
470                 monitor()->addOutputFile(url, getActorId(), urlsForDashboard.value(url));
471             } else if (fileInfo.isDir()) {
472                 monitor()->addOutputFolder(url, getActorId());
473             }
474         }
475     }
476 
477     CHECK(output != nullptr, );
478 
479     /* This variable and corresponded code parts with it
480      * are temporary created for merging sequences.
481      * When standard multiplexing/merging tools will be created
482      * then the variable and code parts must be deleted.
483      */
484     QMap<QString, QList<U2EntityRef>> seqsForMergingBySlotId;
485     QMap<QString, DataConfig> outputUrls = t->takeOutputUrls();
486     QMap<QString, DataConfig>::iterator i = outputUrls.begin();
487     QVariantMap v;
488 
489     for (; i != outputUrls.end(); i++) {
490         DataConfig cfg = i.value();
491         QString url = i.key();
492 
493         if (cfg.isFileUrl()) {
494             if (QFile::exists(url)) {
495                 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
496                 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = url;
497                 context->addExternalProcessFile(url);
498             } else {
499                 reportError(tr("%1 file was not created").arg(url));
500             }
501         } else {
502             U2OpStatusImpl os;
503             QScopedPointer<Document> d(loadDocument(url, cfg, context, os));
504             CHECK_OP_EXT(os, reportError(os.getError()), );
505             d->setDocumentOwnsDbiResources(false);
506 
507             if (cfg.isSequence()) {
508                 QList<GObject *> seqObjects = d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded);
509                 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
510                 QString slotId = WorkflowUtils::getSlotDescOfDatatype(dataType).getId();
511                 if (seqObjects.size() == 1) {
512                     GObject *obj = seqObjects.first();
513                     Workflow::SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(obj->getEntityRef());
514                     v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
515                 } else if (1 < seqObjects.size()) {
516                     QList<U2EntityRef> refs;
517                     foreach (GObject *obj, seqObjects) {
518                         refs << obj->getEntityRef();
519                     }
520                     seqsForMergingBySlotId.insert(slotId, refs);
521                 }
522             } else if (cfg.isAlignment()) {
523                 SharedDbiDataHandler msaId = getAlignment(d.data(), context, os);
524                 CHECK_OP_EXT(os, reportError(os.getError()), );
525                 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
526                 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<SharedDbiDataHandler>(msaId);
527             } else if (cfg.isAnnotations()) {
528                 const SharedDbiDataHandler annTableId = getAnnotations(d.data(), context, os);
529                 CHECK_OP_EXT(os, reportError(os.getError()), );
530                 DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
531                 v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<SharedDbiDataHandler>(annTableId);
532             } else if (cfg.isAnnotatedSequence()) {
533                 if (!d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded).isEmpty()) {
534                     U2SequenceObject *seqObj = static_cast<U2SequenceObject *>(d->findGObjectByType(GObjectTypes::SEQUENCE, UOF_LoadedAndUnloaded).first());
535                     DNASequence seq = seqObj->getWholeSequence(os);
536                     CHECK_OP_EXT(os, reportError(os.getError()), );
537                     seq.alphabet = U2AlphabetUtils::getById(BaseDNAAlphabetIds::RAW());
538                     SharedDbiDataHandler seqId = context->getDataStorage()->putSequence(seq);
539                     v[BaseSlots::DNA_SEQUENCE_SLOT().getId()] = qVariantFromValue<SharedDbiDataHandler>(seqId);
540                 }
541                 const SharedDbiDataHandler annTableId = getAnnotations(d.data(), context, os);
542                 if (!os.hasError()) {
543                     DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
544                     v[BaseSlots::ANNOTATION_TABLE_SLOT().getId()] = qVariantFromValue<SharedDbiDataHandler>(annTableId);
545                 }
546             } else if (cfg.isText()) {
547                 if (!d->findGObjectByType(GObjectTypes::TEXT, UOF_LoadedAndUnloaded).isEmpty()) {
548                     TextObject *obj = static_cast<TextObject *>(d->findGObjectByType(GObjectTypes::TEXT, UOF_LoadedAndUnloaded).first());
549                     DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(cfg.type);
550                     v[WorkflowUtils::getSlotDescOfDatatype(dataType).getId()] = qVariantFromValue<QString>(obj->getText());
551                 }
552             }
553 
554             QFile::remove(url);
555         }
556     }
557 
558     DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(OUTPUT_PORT_TYPE + cfg->id);
559 
560     if (seqsForMergingBySlotId.isEmpty()) {
561         output->put(Message(dataType, v));
562     } else if (seqsForMergingBySlotId.size() == 1) {
563         // create a message for every sequence
564         QString slotId = seqsForMergingBySlotId.keys().first();
565         const QList<U2EntityRef> &refs = seqsForMergingBySlotId.value(slotId);
566         foreach (const U2EntityRef &eRef, refs) {
567             SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(eRef);
568             v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
569             output->put(Message(dataType, v));
570         }
571     } else {
572         // merge every sequence group and send one message
573         U2SequenceImporter seqImporter = U2SequenceImporter(QVariantMap());
574         U2OpStatus2Log os;
575 
576         foreach (const QString &slotId, seqsForMergingBySlotId.keys()) {
577             const QList<U2EntityRef> &refs = seqsForMergingBySlotId.value(slotId);
578             bool first = true;
579             foreach (const U2EntityRef &eRef, refs) {
580                 QScopedPointer<U2SequenceObject> obj(new U2SequenceObject("tmp_name", eRef));
581                 if (first) {
582                     seqImporter.startSequence(os, context->getDataStorage()->getDbiRef(), U2ObjectDbi::ROOT_FOLDER, slotId, false);
583                     first = false;
584                 }
585                 U2Region wholeSeq(0, obj->getSequenceLength());
586                 seqImporter.addSequenceBlock(eRef, wholeSeq, os);
587             }
588             U2Sequence seq = seqImporter.finalizeSequenceAndValidate(os);
589             U2EntityRef eRef(context->getDataStorage()->getDbiRef(), seq.id);
590             SharedDbiDataHandler id = context->getDataStorage()->getDataHandler(eRef);
591             v[slotId] = qVariantFromValue<SharedDbiDataHandler>(id);
592         }
593         CHECK_OP(os, );
594         output->put(Message(dataType, v));
595     }
596 }
597 
init()598 void ExternalProcessWorker::init() {
599     commandLine = cfg->cmdLine;
600     applySpecialInternalEnvvars(commandLine, cfg);
601     applyAttributes(commandLine);
602 
603     output = ports.value(OUT_PORT_ID);
604 
605     foreach (const DataConfig &input, cfg->inputs) {
606         IntegralBus *inBus = ports.value(input.attributeId);
607         inputs << inBus;
608 
609         inBus->addComplement(output);
610     }
611 }
612 
checkInputBusState() const613 ExternalProcessWorker::InputsCheckResult ExternalProcessWorker::checkInputBusState() const {
614     const int inputsCount = inputs.count();
615     CHECK(0 < inputsCount, ALL_INPUTS_FINISH);
616 
617     int inputsWithMessagesCount = 0;
618     int finishedInputs = 0;
619     foreach (const CommunicationChannel *ch, inputs) {
620         SAFE_POINT(nullptr != ch, "Input is nullptr", INTERNAL_ERROR);
621         if (0 != ch->hasMessage()) {
622             ++inputsWithMessagesCount;
623         }
624         if (ch->isEnded()) {
625             ++finishedInputs;
626         }
627     }
628 
629     if (inputsCount == inputsWithMessagesCount) {
630         return ALL_INPUTS_HAVE_MESSAGE;
631     } else if (inputsCount == finishedInputs) {
632         return ALL_INPUTS_FINISH;
633     } else if (0 < finishedInputs && 0 < inputsWithMessagesCount) {
634         return SOME_INPUTS_FINISH;
635     } else {
636         return NOT_ALL_INPUTS_HAVE_MESSAGE;
637     }
638 }
639 
isReady() const640 bool ExternalProcessWorker::isReady() const {
641     CHECK(!isDone(), false);
642     if (inputs.isEmpty()) {
643         return true;
644     } else {
645         const InputsCheckResult checkResult = checkInputBusState();
646         switch (checkResult) {
647             case ALL_INPUTS_FINISH:
648             case SOME_INPUTS_FINISH:
649             case ALL_INPUTS_HAVE_MESSAGE:
650             case INTERNAL_ERROR:
651                 return true;  // the worker will be marked as 'done' in the 'tick' method
652             case NOT_ALL_INPUTS_HAVE_MESSAGE:
653                 return false;
654         }
655     }
656     return false;
657 }
658 
cleanup()659 void ExternalProcessWorker::cleanup() {
660     foreach (const QString &url, inputUrls) {
661         if (QFile::exists(url)) {
662             QFile::remove(url);
663         }
664     }
665 }
666 
667 /************************************************************************/
668 /* LaunchExternalToolTask */
669 /************************************************************************/
LaunchExternalToolTask(const QString & _execString,const QString & _workingDir,const QMap<QString,DataConfig> & _outputUrls)670 LaunchExternalToolTask::LaunchExternalToolTask(const QString &_execString, const QString &_workingDir, const QMap<QString, DataConfig> &_outputUrls)
671     : Task(tr("Launch external process task"), TaskFlag_None), outputUrls(_outputUrls), execString(_execString), workingDir(_workingDir) {
672 }
673 
~LaunchExternalToolTask()674 LaunchExternalToolTask::~LaunchExternalToolTask() {
675     foreach (const QString &url, outputUrls.keys()) {
676         if (QFile::exists(url)) {
677             QFile::remove(url);
678         }
679     }
680 }
681 
682 #define WIN_LAUNCH_CMD_COMMAND "cmd /C "
683 #define START_WAIT_MSEC 3000
684 
run()685 void LaunchExternalToolTask::run() {
686     GCOUNTER(cvar, "A task for an element with external tool is launched");
687     QProcess *externalProcess = new QProcess();
688     externalProcess->setWorkingDirectory(workingDir);
689     if (execString.contains(">")) {
690         QString output = execString.split(">").last();
691         output = output.trimmed();
692         if (output.startsWith('\"')) {
693             output = output.mid(1, output.length() - 2);
694         }
695         execString = execString.split(">").first();
696         externalProcess->setStandardOutputFile(output);
697     }
698     QScopedPointer<CustomExternalToolLogParser> logParser(new CustomExternalToolLogParser());
699     QScopedPointer<ExternalToolRunTaskHelper> helper(new CustomExternalToolRunTaskHelper(externalProcess, logParser.data(), stateInfo));
700     CHECK(listeners.size() > 0, );
701     helper->addOutputListener(listeners[0]);
702     QStringList execStringArgs = ExternalToolSupportUtils::splitCmdLineArguments(execString);
703     QString execStringProg = execStringArgs.takeAt(0);
704 
705     QProcessEnvironment env = QProcessEnvironment::systemEnvironment();
706     externalProcess->setProcessEnvironment(env);
707     taskLog.details(tr("Running external process: %1").arg(execString));
708     bool startOk = WorkflowUtils::startExternalProcess(externalProcess, execStringProg, execStringArgs);
709 
710     if (!startOk) {
711         stateInfo.setError(tr("Can't launch %1").arg(execString));
712         return;
713     }
714     listeners[0]->addNewLogMessage(execString, ExternalToolListener::PROGRAM_WITH_ARGUMENTS);
715     while (!externalProcess->waitForFinished(1000)) {
716         if (isCanceled()) {
717             CmdlineTaskRunner::killProcessTree(externalProcess);
718         }
719     }
720 
721     QProcess::ExitStatus status = externalProcess->exitStatus();
722     int exitCode = externalProcess->exitCode();
723     if (status == QProcess::CrashExit && !hasError()) {
724         setError(tr("External process %1 exited with the following error: %2 (Code: %3)")
725                      .arg(execString)
726                      .arg(externalProcess->errorString())
727                      .arg(exitCode));
728     } else if (status == QProcess::NormalExit && exitCode != EXIT_SUCCESS && !hasError()) {
729         setError(tr("External process %1 exited with code %2").arg(execString).arg(exitCode));
730     } else if (status == QProcess::NormalExit && exitCode == EXIT_SUCCESS && !hasError()) {
731         algoLog.details(tr("External process \"%1\" finished successfully").arg(execString));
732     }
733 }
734 
takeOutputUrls()735 QMap<QString, DataConfig> LaunchExternalToolTask::takeOutputUrls() {
736     QMap<QString, DataConfig> result = outputUrls;
737     outputUrls.clear();
738     return result;
739 }
740 
addListeners(const QList<ExternalToolListener * > & listenersToAdd)741 void LaunchExternalToolTask::addListeners(const QList<ExternalToolListener *> &listenersToAdd) {
742     listeners.append(listenersToAdd);
743 }
744 
745 /************************************************************************/
746 /* ExternalProcessWorkerPrompter */
747 /************************************************************************/
composeRichDoc()748 QString ExternalProcessWorkerPrompter::composeRichDoc() {
749     ExternalProcessConfig *cfg = WorkflowEnv::getExternalCfgRegistry()->getConfigById(target->getProto()->getId());
750     assert(cfg);
751     QString doc(cfg->templateDescription);
752     doc.replace("\n", "<br>");
753 
754     foreach (const DataConfig &dataCfg, cfg->inputs) {
755         QRegExp param(QString("\\$%1[^%2]|$").arg(dataCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
756         if (doc.contains(param)) {
757             IntegralBusPort *input = qobject_cast<IntegralBusPort *>(target->getPort(dataCfg.attributeId));
758             DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(dataCfg.type);
759             if (dataCfg.type == SEQ_WITH_ANNS) {
760                 dataType = BaseTypes::DNA_SEQUENCE_TYPE();
761             }
762             Actor *producer = input->getProducer(WorkflowUtils::getSlotDescOfDatatype(dataType).getId());
763             QString unsetStr = "<font color='red'>" + tr("unset") + "</font>";
764             QString producerName = tr("<u>%1</u>").arg(producer ? producer->getLabel() : unsetStr);
765             doc.replace("$" + dataCfg.attributeId, producerName);
766         }
767     }
768 
769     foreach (const DataConfig &dataCfg, cfg->outputs) {
770         QRegExp param(QString("\\$%1[^%2]|$").arg(dataCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
771         if (doc.contains(param)) {
772             IntegralBusPort *output = qobject_cast<IntegralBusPort *>(target->getPort(OUT_PORT_ID));
773             DataTypePtr dataType = WorkflowEnv::getDataTypeRegistry()->getById(dataCfg.type);
774             if (dataCfg.type == SEQ_WITH_ANNS) {
775                 dataType = BaseTypes::DNA_SEQUENCE_TYPE();
776             }
777             QString destinations;
778             QString unsetStr = "<font color='red'>" + tr("unset") + "</font>";
779             if (!output->getLinks().isEmpty()) {
780                 foreach (Port *p, output->getLinks().keys()) {
781                     IntegralBusPort *ibp = qobject_cast<IntegralBusPort *>(p);
782                     Actor *dest = ibp->owner();
783                     destinations += tr("<u>%1</u>").arg(dest ? dest->getLabel() : unsetStr) + ",";
784                 }
785             }
786             if (destinations.isEmpty()) {
787                 destinations = tr("<u>%1</u>").arg(unsetStr);
788             } else {
789                 destinations.resize(destinations.size() - 1);  // remove last semicolon
790             }
791             doc.replace("$" + dataCfg.attributeId, destinations);
792         }
793     }
794 
795     foreach (const AttributeConfig &attrCfg, cfg->attrs) {
796         QRegExp param(QString("\\$%1([^%2]|$)").arg(attrCfg.attributeId).arg(WorkflowEntityValidator::ID_ACCEPTABLE_SYMBOLS_TEMPLATE));
797         if (doc.contains(param)) {
798             QString prm = getRequiredParam(attrCfg.attributeId);
799             doc.replace("$" + attrCfg.attributeId, getHyperlink(attrCfg.attrName, prm));
800         }
801     }
802 
803     return doc;
804 }
805 
806 }  // namespace LocalWorkflow
807 }  // namespace U2
808