1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "WevoteWorker.h"
23
24 #include <QFileInfo>
25
26 #include <U2Core/FailTask.h>
27 #include <U2Core/FileAndDirectoryUtils.h>
28 #include <U2Core/GUrlUtils.h>
29 #include <U2Core/TaskSignalMapper.h>
30 #include <U2Core/U2OpStatusUtils.h>
31
32 #include <U2Lang/BaseSlots.h>
33 #include <U2Lang/WorkflowMonitor.h>
34
35 #include "../ngs_reads_classification/src/NgsReadsClassificationUtils.h"
36 #include "WevoteWorkerFactory.h"
37
38 namespace U2 {
39 namespace LocalWorkflow {
40
41 const QString WevoteWorker::WEVOTE_DIR = "wevote";
42
WevoteWorker(Actor * actor)43 WevoteWorker::WevoteWorker(Actor *actor)
44 : BaseWorker(actor),
45 input(nullptr),
46 output(nullptr) {
47 }
48
init()49 void WevoteWorker::init() {
50 input = ports.value(WevoteWorkerFactory::INPUT_PORT_ID);
51 output = ports.value(WevoteWorkerFactory::OUTPUT_PORT_ID);
52 SAFE_POINT(nullptr != input, QString("Port with id '%1' is NULL").arg(WevoteWorkerFactory::INPUT_PORT_ID), );
53 SAFE_POINT(nullptr != output, QString("Port with id '%1' is NULL").arg(WevoteWorkerFactory::OUTPUT_PORT_ID), );
54 }
55
tick()56 Task *WevoteWorker::tick() {
57 if (isReadyToRun()) {
58 U2OpStatus2Log os;
59 WevoteTaskSettings settings = getSettings(os);
60 if (os.hasError()) {
61 return new FailTask(os.getError());
62 }
63
64 WevoteTask *task = new WevoteTask(settings, context->getWorkflowProcess());
65 task->addListeners(createLogListeners());
66 connect(new TaskSignalMapper(task), SIGNAL(si_taskFinished(Task *)), SLOT(sl_taskFinished(Task *)));
67 return task;
68 }
69
70 if (dataFinished()) {
71 setDone();
72 output->setEnded();
73 }
74
75 return nullptr;
76 }
77
cleanup()78 void WevoteWorker::cleanup() {
79 }
80
sl_taskFinished(Task * task)81 void WevoteWorker::sl_taskFinished(Task *task) {
82 WevoteTask *wevoteTask = qobject_cast<WevoteTask *>(task);
83 if (!wevoteTask->isFinished() || wevoteTask->hasError() || wevoteTask->isCanceled()) {
84 return;
85 }
86
87 const QString classificationUrl = wevoteTask->getClassificationUrl();
88 const TaxonomyClassificationResult classification = wevoteTask->getClassification();
89
90 QVariantMap data;
91 data[TaxonomySupport::TAXONOMY_CLASSIFICATION_SLOT_ID] = QVariant::fromValue<TaxonomyClassificationResult>(classification);
92 output->put(Message(output->getBusType(), data));
93
94 context->getMonitor()->addOutputFile(classificationUrl, getActor()->getId());
95
96 int classifiedCount = NgsReadsClassificationUtils::countClassified(classification);
97 context->getMonitor()->addInfo(tr("There were %1 input reads, %2 reads were classified.").arg(QString::number(classification.size())).arg(QString::number(classifiedCount)), getActor()->getId(), WorkflowNotification::U2_INFO);
98 }
99
isReadyToRun() const100 bool WevoteWorker::isReadyToRun() const {
101 return input->hasMessage();
102 }
103
dataFinished() const104 bool WevoteWorker::dataFinished() const {
105 return input->isEnded();
106 }
107
getSettings(U2OpStatus & os)108 WevoteTaskSettings WevoteWorker::getSettings(U2OpStatus &os) {
109 WevoteTaskSettings settings;
110
111 settings.penalty = getValue<int>(WevoteWorkerFactory::PENALTY_ATTR_ID);
112 settings.numberOfAgreedTools = getValue<int>(WevoteWorkerFactory::NUMBER_OF_AGREED_TOOLS_ATTR_ID);
113 settings.scoreThreshold = getValue<int>(WevoteWorkerFactory::SCORE_THRESHOLD_ATTR_ID);
114 settings.numberOfThreads = getValue<int>(WevoteWorkerFactory::NUMBER_OF_THREADS_ATTR_ID);
115
116 const Message message = getMessageAndSetupScriptValues(input);
117 settings.inputFileUrl = message.getData().toMap()[BaseSlots::URL_SLOT().getId()].toString();
118 CHECK_EXT(!settings.inputFileUrl.isEmpty(), os.setError(tr("Empty input file URL in the message")), settings);
119
120 settings.workingDir = FileAndDirectoryUtils::createWorkingDir(context->workingDir(), FileAndDirectoryUtils::WORKFLOW_INTERNAL, "", context->workingDir());
121 settings.workingDir = GUrlUtils::createDirectory(settings.workingDir + WEVOTE_DIR, "_", os);
122
123 settings.outputFileUrl = getValue<QString>(WevoteWorkerFactory::OUTPUT_FILE_ATTR_ID);
124 if (settings.outputFileUrl.isEmpty()) {
125 const MessageMetadata metadata = context->getMetadataStorage().get(message.getMetadataId());
126 settings.outputFileUrl = settings.workingDir + "/" + QFileInfo(metadata.getFileUrl()).completeBaseName() + WevoteTask::SUFFIX;
127 }
128
129 return settings;
130 }
131
132 } // namespace LocalWorkflow
133 } // namespace U2
134