1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "VcfConsensusWorker.h"
23
24 #include "samtools/TabixSupport.h"
25
26 #include <U2Core/FailTask.h>
27 #include <U2Core/U2OpStatusUtils.h>
28 #include <U2Core/U2SafePoints.h>
29
30 #include <U2Designer/DelegateEditors.h>
31
32 #include <U2Lang/ActorPrototypeRegistry.h>
33 #include <U2Lang/BaseActorCategories.h>
34 #include <U2Lang/BasePorts.h>
35 #include <U2Lang/BaseSlots.h>
36 #include <U2Lang/BaseTypes.h>
37 #include <U2Lang/WorkflowEnv.h>
38 #include <U2Lang/WorkflowMonitor.h>
39
40 #include "VcfConsensusSupport.h"
41 #include "VcfConsensusSupportTask.h"
42
43 namespace U2 {
44 namespace LocalWorkflow {
45
46 static const QString IN_PORT_ID("in-data");
47 static const QString IN_FASTA_URL_SLOT_ID("fasta");
48 static const QString IN_VCF_URL_SLOT_ID("vcf");
49 static const QString OUT_PORT_ID("out-consensus");
50 static const QString OUT_FASTA_URL_ID("consensus-url");
51
52 // VcfConsensusWorker
53
VcfConsensusWorker(Actor * a)54 VcfConsensusWorker::VcfConsensusWorker(Actor *a)
55 : BaseWorker(a),
56 inputFA(nullptr),
57 inputVcfBgzip(nullptr),
58 outputFA(nullptr) {
59 }
60
init()61 void VcfConsensusWorker::init() {
62 inputFA = ports.value(IN_PORT_ID);
63 inputVcfBgzip = ports.value(IN_VCF_URL_SLOT_ID);
64 outputFA = ports.value(OUT_PORT_ID);
65 }
66
tick()67 Task *VcfConsensusWorker::tick() {
68 if (inputFA->hasMessage()) {
69 const Message inputMessage = getMessageAndSetupScriptValues(inputFA);
70 if (inputMessage.isEmpty()) {
71 outputFA->transit();
72 return nullptr;
73 }
74 const QVariantMap data = inputMessage.getData().toMap();
75 if (!data.contains(IN_FASTA_URL_SLOT_ID)) {
76 return new FailTask(tr("Input fasta slot is empty"));
77 }
78 if (!data.contains(IN_VCF_URL_SLOT_ID)) {
79 return new FailTask(tr("Input vcf slot is empty"));
80 }
81
82 GUrl fastaURL(data.value(IN_FASTA_URL_SLOT_ID).toString());
83 GUrl vcfURL(data.value(IN_VCF_URL_SLOT_ID).toString());
84 GUrl outputURL(getValue<QString>(OUT_FASTA_URL_ID));
85
86 VcfConsensusSupportTask *t = nullptr;
87 t = new VcfConsensusSupportTask(fastaURL, vcfURL, outputURL);
88 t->addListeners(createLogListeners(2));
89 connect(t, SIGNAL(si_stateChanged()), SLOT(sl_taskFinished()));
90
91 return t;
92 } else {
93 setDone();
94 outputFA->setEnded();
95 }
96 return nullptr;
97 }
98
cleanup()99 void VcfConsensusWorker::cleanup() {
100 }
101
sl_taskFinished()102 void VcfConsensusWorker::sl_taskFinished() {
103 VcfConsensusSupportTask *t = dynamic_cast<VcfConsensusSupportTask *>(sender());
104 CHECK(t != nullptr, );
105 CHECK(t->isFinished() && !t->hasError(), );
106
107 if (t->isCanceled()) {
108 return;
109 }
110 QString outUrl = t->getResultUrl().getURLString();
111
112 QVariantMap data;
113 data[OUT_PORT_ID] = outUrl;
114 outputFA->put(Message(ports[OUT_PORT_ID]->getBusType(), data));
115
116 monitor()->addOutputFile(outUrl, getActorId());
117 }
118
119 // VcfConsensusWorkerFactory
120
121 const QString VcfConsensusWorkerFactory::ACTOR_ID("vcf-consensus");
122
init()123 void VcfConsensusWorkerFactory::init() {
124 QList<PortDescriptor *> ports;
125 {
126 Descriptor inDesc(IN_PORT_ID, VcfConsensusWorker::tr("Input FASTA and VCF"), VcfConsensusWorker::tr("Input FASTA and VCF"));
127 Descriptor inFastaDesc(IN_FASTA_URL_SLOT_ID, VcfConsensusWorker::tr("FASTA url"), VcfConsensusWorker::tr("FASTA url"));
128 Descriptor inVcfDesc(IN_VCF_URL_SLOT_ID, VcfConsensusWorker::tr("VCF url"), VcfConsensusWorker::tr("VCF url"));
129 QMap<Descriptor, DataTypePtr> inM;
130 inM[inFastaDesc] = BaseTypes::STRING_TYPE();
131 inM[inVcfDesc] = BaseTypes::STRING_TYPE();
132 ports << new PortDescriptor(inDesc, DataTypePtr(new MapDataType("in.fasta_vcf", inM)), true /*input*/);
133
134 Descriptor outDesc(OUT_PORT_ID, VcfConsensusWorker::tr("Fasta consensus url"), VcfConsensusWorker::tr("Fasta consensus url"));
135 QMap<Descriptor, DataTypePtr> outM;
136 outM[OUT_PORT_ID] = BaseTypes::STRING_TYPE();
137 ports << new PortDescriptor(outDesc, DataTypePtr(new MapDataType("out.fasta", outM)), false /*input*/, true /*multi*/);
138 }
139
140 QList<Attribute *> attrs;
141 {
142 Descriptor outAttDesc(OUT_FASTA_URL_ID, VcfConsensusWorker::tr("Output FASTA consensus"), VcfConsensusWorker::tr("The path to the output file with the result consensus."));
143 attrs << new Attribute(outAttDesc, BaseTypes::STRING_TYPE(), true);
144 }
145
146 QMap<QString, PropertyDelegate *> delegates;
147 {
148 delegates[OUT_FASTA_URL_ID] = new URLDelegate("", "", false /*multi*/, false /*isPath*/, true /*save*/);
149 }
150
151 Descriptor desc(ACTOR_ID,
152 VcfConsensusWorker::tr("Create VCF Consensus"),
153 VcfConsensusWorker::tr("Apply VCF variants to a fasta file to create consensus sequence."));
154 ActorPrototype *proto = new IntegralBusActorPrototype(desc, ports, attrs);
155 proto->setPrompter(new VcfConsensusPrompter());
156 proto->setEditor(new DelegateEditor(delegates));
157 proto->addExternalTool(VcfConsensusSupport::ET_VCF_CONSENSUS_ID);
158 proto->addExternalTool(TabixSupport::ET_TABIX_ID);
159
160 SAFE_POINT(WorkflowEnv::getProtoRegistry() != nullptr, "Workflow proto registry is NULL", );
161 WorkflowEnv::getProtoRegistry()->registerProto(BaseActorCategories::CATEGORY_VARIATION_ANALYSIS(), proto);
162
163 SAFE_POINT(WorkflowEnv::getDomainRegistry() != nullptr, "Workflow domain registry is NULL", );
164 DomainFactory *localDomain = WorkflowEnv::getDomainRegistry()->getById(LocalDomainFactory::ID);
165 localDomain->registerEntry(new VcfConsensusWorkerFactory());
166 }
167
composeRichDoc()168 QString VcfConsensusPrompter::composeRichDoc() {
169 IntegralBusPort *in = qobject_cast<IntegralBusPort *>(target->getPort(IN_PORT_ID));
170 SAFE_POINT(in != nullptr, "NULL input port", "");
171 QString fasta = getProducersOrUnset(IN_PORT_ID, IN_FASTA_URL_SLOT_ID);
172 QString vcf = getProducersOrUnset(IN_PORT_ID, IN_VCF_URL_SLOT_ID);
173 QString out = getHyperlink(OUT_FASTA_URL_ID, getURL(OUT_FASTA_URL_ID));
174
175 return tr("Apply VCF variants from <u>%1</u> to fasta file <u>%2</u> and save consensus sequence to <u>%3</u>.")
176 .arg(vcf)
177 .arg(fasta)
178 .arg(out);
179 }
180
181 } // namespace LocalWorkflow
182 } // namespace U2
183