1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "DocWorkers.h"
23
24 #include <QScopedPointer>
25
26 #include <U2Core/AnnotationData.h>
27 #include <U2Core/AnnotationTableObject.h>
28 #include <U2Core/AppContext.h>
29 #include <U2Core/DNASequence.h>
30 #include <U2Core/DocumentModel.h>
31 #include <U2Core/FailTask.h>
32 #include <U2Core/GObjectRelationRoles.h>
33 #include <U2Core/GObjectUtils.h>
34 #include <U2Core/GUrlUtils.h>
35 #include <U2Core/IOAdapter.h>
36 #include <U2Core/IOAdapterUtils.h>
37 #include <U2Core/Log.h>
38 #include <U2Core/MultipleSequenceAlignmentImporter.h>
39 #include <U2Core/MultipleSequenceAlignmentObject.h>
40 #include <U2Core/TextObject.h>
41 #include <U2Core/U2AttributeUtils.h>
42 #include <U2Core/U2OpStatusUtils.h>
43 #include <U2Core/U2SafePoints.h>
44 #include <U2Core/U2SequenceUtils.h>
45
46 #include <U2Lang/BaseAttributes.h>
47 #include <U2Lang/BaseSlots.h>
48 #include <U2Lang/CoreLibConstants.h>
49 #include <U2Lang/Dataset.h>
50 #include <U2Lang/SharedDbUrlUtils.h>
51 #include <U2Lang/WorkflowEnv.h>
52 #include <U2Lang/WorkflowUtils.h>
53
54 #include "GenericReadWorker.h"
55
56 namespace U2 {
57 namespace LocalWorkflow {
58
59 static int ct = 0;
60
61 const int TextReader::MAX_LINE_LEN = 1024;
62 const int TextReader::READ_BLOCK_SIZE = 1024;
63
64 /*************************************
65 * TextReader
66 *************************************/
TextReader(Actor * a)67 TextReader::TextReader(Actor *a)
68 : BaseWorker(a), ch(nullptr), io(nullptr), urls(nullptr) {
69 mtype = WorkflowEnv::getDataTypeRegistry()->getById(CoreLibConstants::TEXT_TYPESET_ID);
70 }
71
init()72 void TextReader::init() {
73 QList<Dataset> sets = actor->getParameter(BaseAttributes::URL_IN_ATTRIBUTE().getId())->getAttributeValue<QList<Dataset>>(context);
74 urls = new DatasetFilesIterator(sets);
75
76 assert(ports.size() == 1);
77 ch = ports.values().first();
78 }
79
cleanup()80 void TextReader::cleanup() {
81 delete io;
82 delete urls;
83 }
84
sendMessage(const QByteArray & data)85 void TextReader::sendMessage(const QByteArray &data) {
86 QVariantMap m;
87 m[BaseSlots::TEXT_SLOT().getId()] = QString(data);
88 m[BaseSlots::URL_SLOT().getId()] = url;
89 m[BaseSlots::DATASET_SLOT().getId()] = urls->getLastDatasetName();
90 MessageMetadata metadata(url, urls->getLastDatasetName());
91 context->getMetadataStorage().put(metadata);
92 ch->put(Message(mtype, m, metadata.getId()));
93 }
94
tick()95 Task *TextReader::tick() {
96 if (nullptr != io && io->isOpen()) {
97 processNextLine();
98 } else if (urls->hasNext()) {
99 url = urls->getNextFile();
100 Task *resultTask = processUrlEntity(url);
101 if (nullptr != resultTask) {
102 return resultTask;
103 }
104 }
105 if (!urls->hasNext() && (nullptr == io || !io->isOpen())) {
106 ch->setEnded();
107 setDone();
108 }
109 return nullptr;
110 }
111
processUrlEntity(const QString & url)112 Task *TextReader::processUrlEntity(const QString &url) {
113 return SharedDbUrlUtils::isDbObjectUrl(url) ? processDbObject(url) : processFile(url);
114 }
115
processDbObject(const QString & url)116 Task *TextReader::processDbObject(const QString &url) {
117 const U2DataId objDbId = SharedDbUrlUtils::getObjectIdByUrl(url);
118 CHECK(!objDbId.isEmpty(), createDbObjectReadFailTask(url));
119 const U2DbiRef dbRef = SharedDbUrlUtils::getDbRefFromEntityUrl(url);
120 CHECK(dbRef.isValid(), createDbObjectReadFailTask(url));
121 const QString objDbName = SharedDbUrlUtils::getDbObjectNameByUrl(url);
122 CHECK(!objDbName.isEmpty(), createDbObjectReadFailTask(url));
123
124 QScopedPointer<TextObject> obj(qobject_cast<TextObject *>(GObjectUtils::createObject(dbRef, objDbId, objDbName)));
125 CHECK(!obj.isNull(), createDbObjectReadFailTask(url));
126 sendMessage(obj->getText().toLocal8Bit());
127
128 return nullptr;
129 }
130
createDbObjectReadFailTask(const QString & url)131 Task *TextReader::createDbObjectReadFailTask(const QString &url) {
132 const QString objName = SharedDbUrlUtils::getDbObjectNameByUrl(url);
133 const QString dbShortName = SharedDbUrlUtils::getDbShortNameFromEntityUrl(url);
134 return new FailTask(tr("Can't load the object %1 from the database %2").arg(objName).arg(dbShortName));
135 }
136
processFile(const QString & url)137 Task *TextReader::processFile(const QString &url) {
138 IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(IOAdapterUtils::url2io(url));
139 io = iof->createIOAdapter();
140 if (!io->open(url, IOAdapterMode_Read)) {
141 return new FailTask(tr("Can't load file %1").arg(url));
142 }
143 if (actor->getParameter(BaseAttributes::READ_BY_LINES_ATTRIBUTE().getId())->getAttributeValue<bool>(context) == false) {
144 QByteArray buf;
145 int read = 0;
146 int offs = 0;
147 buf.resize(READ_BLOCK_SIZE);
148 buf.fill(0);
149 do {
150 read = io->readBlock(buf.data() + offs, READ_BLOCK_SIZE);
151 if (read == -1) {
152 return new FailTask(tr("Can't load file %1. %2").arg(url).arg(io->errorString()));
153 }
154 if (read != READ_BLOCK_SIZE) {
155 SAFE_POINT(read < READ_BLOCK_SIZE, "Error while reading file", nullptr);
156 buf.resize(buf.size() - READ_BLOCK_SIZE + read);
157 break;
158 }
159 offs += read;
160 buf.resize(offs + READ_BLOCK_SIZE);
161 } while (read == READ_BLOCK_SIZE);
162
163 sendMessage(buf);
164 io->close();
165 } else {
166 processNextLine();
167 }
168 return nullptr;
169 }
170
processNextLine()171 void TextReader::processNextLine() {
172 QByteArray buf;
173 buf.resize(MAX_LINE_LEN);
174 buf.fill(0);
175 int read = io->readLine(buf.data(), MAX_LINE_LEN);
176 buf.resize(read);
177 sendMessage(buf);
178 if (io->isEof()) {
179 io->close();
180 }
181 }
182
183 /*************************************
184 * TextWriter
185 *************************************/
data2doc(Document * doc,const QVariantMap & data)186 void TextWriter::data2doc(Document *doc, const QVariantMap &data) {
187 QStringList list = data.value(BaseSlots::TEXT_SLOT().getId()).toStringList();
188 QString text = list.join("\n");
189 TextObject *to = qobject_cast<TextObject *>(GObjectUtils::selectOne(doc->getObjects(), GObjectTypes::TEXT, UOF_LoadedOnly));
190 if (!to) {
191 U2OpStatus2Log os;
192 to = TextObject::createInstance(text, QString("Text %1").arg(++ct), context->getDataStorage()->getDbiRef(), os);
193 CHECK_OP(os, );
194 doc->addObject(to);
195 } else {
196 to->setText(to->getText() + "\n" + text);
197 }
198 }
199
hasDataToWrite(const QVariantMap & data) const200 bool TextWriter::hasDataToWrite(const QVariantMap &data) const {
201 return data.contains(BaseSlots::TEXT_SLOT().getId());
202 }
203
getObjectsToWrite(const QVariantMap & data) const204 QSet<GObject *> TextWriter::getObjectsToWrite(const QVariantMap &data) const {
205 const QStringList text = data[BaseSlots::TEXT_SLOT().getId()].value<QStringList>();
206
207 U2OpStatusImpl os;
208 GObject *res = TextObject::createInstance(text.join("\n"), "Text", context->getDataStorage()->getDbiRef(), os);
209 SAFE_POINT_OP(os, QSet<GObject *>());
210 return QSet<GObject *>() << res;
211 }
212
isStreamingSupport() const213 bool TextWriter::isStreamingSupport() const {
214 return false;
215 }
216
isSupportedSeveralMessages() const217 bool TextWriter::isSupportedSeveralMessages() const {
218 return true;
219 }
220
221 /**
222 * It can change sequence name for setting unique object name
223 */
addSeqObject(Document * doc,DNASequence & seq)224 static U2SequenceObject *addSeqObject(Document *doc, DNASequence &seq) {
225 SAFE_POINT(nullptr != seq.alphabet, "Add sequence to document: empty alphabet", nullptr);
226 SAFE_POINT(0 != seq.length(), "Add sequence to document: empty length", nullptr);
227
228 if (doc->findGObjectByName(seq.getName())) {
229 QString uniqueName = BaseDocWriter::getUniqueObjectName(doc, seq.getName());
230 seq.setName(uniqueName);
231 }
232 algoLog.trace(QString("Adding seq [%1] to %3 doc %2").arg(seq.getName()).arg(doc->getURLString()).arg(doc->getDocumentFormat()->getFormatName()));
233
234 U2SequenceObject *dna = nullptr;
235 if (doc->getDocumentFormat()->isObjectOpSupported(doc, DocumentFormat::DocObjectOp_Add, GObjectTypes::SEQUENCE)) {
236 U2OpStatus2Log os;
237 U2EntityRef seqRef = U2SequenceUtils::import(os, doc->getDbiRef(), seq);
238 CHECK_OP(os, nullptr);
239 dna = new U2SequenceObject(seq.getName(), seqRef);
240 doc->addObject(dna);
241 } else {
242 algoLog.trace("Failed to add sequence object to document: op is not supported!");
243 }
244
245 return dna;
246 }
247
248 /*************************************
249 * FastaWriter
250 *************************************/
data2doc(Document * doc,const QVariantMap & data)251 void FastaWriter::data2doc(Document *doc, const QVariantMap &data) {
252 data2document(doc, data, context, numSplitSequences, currentSplitSequence);
253 currentSplitSequence++;
254 }
255
hasDataToWrite(const QVariantMap & data) const256 bool FastaWriter::hasDataToWrite(const QVariantMap &data) const {
257 return SeqWriter::hasSequence(data);
258 }
259
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)260 void FastaWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
261 streamingStoreEntry(format, io, data, context, entryNum);
262 currentSplitSequence++;
263 }
264
getSplitRegion(int numSplitSequences,int currentSplitSequence,qint64 seqLen)265 U2Region FastaWriter::getSplitRegion(int numSplitSequences, int currentSplitSequence, qint64 seqLen) {
266 U2Region result;
267 result.startPos = currentSplitSequence * (seqLen / numSplitSequences);
268 result.length = seqLen / numSplitSequences;
269 if (currentSplitSequence == (numSplitSequences - 1)) {
270 result.length += seqLen % numSplitSequences;
271 }
272 return result;
273 }
274
getSeqObj(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)275 static U2SequenceObject *getSeqObj(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
276 if (!data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
277 os.setError("Fasta writer: no sequence");
278 return nullptr;
279 }
280 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
281 U2SequenceObject *seqObj = StorageUtils::getSequenceObject(context->getDataStorage(), seqId);
282 if (nullptr == seqObj) {
283 os.setError("Fasta writer: NULL sequence object");
284 }
285 return seqObj;
286 }
287
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context,int numSplitSequences,int currentSplitSequence)288 void FastaWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context, int numSplitSequences, int currentSplitSequence) {
289 U2OpStatusImpl os;
290 QScopedPointer<U2SequenceObject> seqObj(getSeqObj(data, context, os));
291 SAFE_POINT_OP(os, );
292
293 U2Region splitRegion = getSplitRegion(numSplitSequences, currentSplitSequence, seqObj->getSequenceLength());
294 QByteArray splitSequence = seqObj->getSequenceData(splitRegion, os);
295 CHECK_OP(os, );
296
297 DNASequence seq(seqObj->getSequenceName() + ((numSplitSequences == 1) ? QString("%1..%2").arg(splitRegion.startPos + 1, splitRegion.length) : ""), splitSequence, seqObj->getAlphabet());
298 seq.circular = seqObj->isCircular();
299 seq.quality = seqObj->getQuality();
300 seq.info = seqObj->getSequenceInfo();
301
302 QString sequenceName = data.value(BaseSlots::FASTA_HEADER_SLOT().getId()).toString();
303 if (sequenceName.isEmpty()) {
304 sequenceName = seq.getName();
305 if (sequenceName.isEmpty()) {
306 sequenceName = QString("unknown sequence %1").arg(doc->getObjects().size());
307 }
308 } else {
309 seq.info.insert(DNAInfo::FASTA_HDR, sequenceName);
310 }
311 seq.setName(sequenceName);
312 addSeqObject(doc, seq);
313 }
314
getCopiedSequenceObject(const QVariantMap & data,WorkflowContext * context,U2OpStatus2Log & os,const U2Region & reg=U2_REGION_MAX)315 inline static U2SequenceObject *getCopiedSequenceObject(const QVariantMap &data, WorkflowContext *context, U2OpStatus2Log &os, const U2Region ® = U2_REGION_MAX) {
316 QScopedPointer<U2SequenceObject> seqObj(getSeqObj(data, context, os));
317 SAFE_POINT_OP(os, nullptr);
318
319 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
320 int refCount = seqId.constData()->getReferenceCount();
321 if (refCount > 2) { // need to copy because it is used by another worker
322 DNASequence seq = seqObj->getSequence(reg, os);
323 CHECK_OP(os, nullptr);
324 U2EntityRef seqRef = U2SequenceUtils::import(os, context->getDataStorage()->getDbiRef(), seq);
325 CHECK_OP(os, nullptr);
326
327 U2SequenceObject *clonedSeqObj = new U2SequenceObject(seqObj->getSequenceName(), seqRef);
328 U2AttributeUtils::copyObjectAttributes(seqObj->getEntityRef(), clonedSeqObj->getEntityRef(), os);
329
330 return clonedSeqObj;
331 } else {
332 return seqObj.take();
333 }
334 }
335
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)336 void FastaWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
337 CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
338 U2OpStatus2Log os;
339 QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
340 SAFE_POINT_OP(os, );
341
342 QString sequenceName = data.value(BaseSlots::FASTA_HEADER_SLOT().getId(), QString()).toString();
343 if (sequenceName.isEmpty()) {
344 sequenceName = seqObj->getGObjectName();
345 if (sequenceName.isEmpty()) {
346 sequenceName = QString("unknown sequence %1").arg(entryNum);
347 }
348 } else {
349 QVariantMap info = seqObj->getSequenceInfo();
350 info.insert(DNAInfo::FASTA_HDR, sequenceName);
351 seqObj->setSequenceInfo(info);
352 }
353 seqObj->setGObjectName(sequenceName);
354
355 QMap<GObjectType, QList<GObject *>> objectsMap;
356 {
357 QList<GObject *> seqs;
358 seqs << seqObj.data();
359 objectsMap[GObjectTypes::SEQUENCE] = seqs;
360 }
361 format->storeEntry(io, objectsMap, os);
362 }
363
364 /*************************************
365 * FastQWriter
366 *************************************/
data2doc(Document * doc,const QVariantMap & data)367 void FastQWriter::data2doc(Document *doc, const QVariantMap &data) {
368 data2document(doc, data, context);
369 }
370
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)371 void FastQWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
372 streamingStoreEntry(format, io, data, context, entryNum);
373 }
374
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)375 void FastQWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
376 CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
377 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
378 QScopedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
379 SAFE_POINT(nullptr != seqObj.data(), tr("Fastq writer: NULL sequence object"), );
380
381 U2OpStatusImpl os;
382 DNASequence seq = seqObj->getWholeSequence(os);
383 SAFE_POINT_OP(os, );
384
385 if (seq.getName().isEmpty()) {
386 seq.setName(QString("unknown sequence %1").arg(doc->getObjects().size()));
387 }
388 addSeqObject(doc, seq);
389 }
390
hasDataToWrite(const QVariantMap & data) const391 bool FastQWriter::hasDataToWrite(const QVariantMap &data) const {
392 return SeqWriter::hasSequence(data);
393 }
394
getObjectsToWrite(const QVariantMap & data) const395 QSet<GObject *> FastQWriter::getObjectsToWrite(const QVariantMap &data) const {
396 return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
397 }
398
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)399 void FastQWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
400 CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
401 U2OpStatus2Log os;
402 QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
403 SAFE_POINT_OP(os, );
404
405 if (seqObj->getGObjectName().isEmpty()) {
406 seqObj->setGObjectName(QString("unknown sequence %1").arg(entryNum));
407 }
408
409 QMap<GObjectType, QList<GObject *>> objectsMap;
410 {
411 QList<GObject *> seqs;
412 seqs << seqObj.data();
413 objectsMap[GObjectTypes::SEQUENCE] = seqs;
414 }
415 format->storeEntry(io, objectsMap, os);
416 }
417
418 /*************************************
419 * RawSeqWriter
420 *************************************/
data2doc(Document * doc,const QVariantMap & data)421 void RawSeqWriter::data2doc(Document *doc, const QVariantMap &data) {
422 data2document(doc, data, context);
423 }
424
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)425 void RawSeqWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
426 streamingStoreEntry(format, io, data, context, entryNum);
427 }
428
429 // same as FastQWriter::data2document
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)430 void RawSeqWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
431 CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
432 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
433 QScopedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
434 SAFE_POINT(nullptr != seqObj.data(), tr("Raw sequence writer: NULL sequence object"), );
435
436 U2OpStatusImpl os;
437 DNASequence seq = seqObj->getWholeSequence(os);
438 SAFE_POINT_OP(os, );
439
440 if (seq.getName().isEmpty()) {
441 seq.setName(QString("unknown sequence %1").arg(doc->getObjects().size()));
442 }
443 addSeqObject(doc, seq);
444 }
445
hasDataToWrite(const QVariantMap & data) const446 bool RawSeqWriter::hasDataToWrite(const QVariantMap &data) const {
447 return SeqWriter::hasSequence(data);
448 }
449
getObjectToWrite(const QVariantMap & data) const450 GObject *RawSeqWriter::getObjectToWrite(const QVariantMap &data) const {
451 return SeqWriter::getSeqObject(data, context);
452 }
453
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int)454 void RawSeqWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int) {
455 CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
456 U2OpStatus2Log os;
457 QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
458 SAFE_POINT_OP(os, );
459
460 QMap<GObjectType, QList<GObject *>> objectsMap;
461 {
462 QList<GObject *> seqs;
463 seqs << seqObj.data();
464 objectsMap[GObjectTypes::SEQUENCE] = seqs;
465 }
466 format->storeEntry(io, objectsMap, os);
467 }
468
469 /*************************************
470 * GenbankWriter
471 *************************************/
getAnnotationName(const QString & seqName)472 inline static QString getAnnotationName(const QString &seqName) {
473 QString result = seqName;
474 if (result.contains(SEQUENCE_TAG)) {
475 result.replace(SEQUENCE_TAG, FEATURES_TAG);
476 } else {
477 result += FEATURES_TAG;
478 }
479
480 return result;
481 }
482
data2doc(Document * doc,const QVariantMap & data)483 void GenbankWriter::data2doc(Document *doc, const QVariantMap &data) {
484 data2document(doc, data, context);
485 }
486
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)487 void GenbankWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
488 streamingStoreEntry(format, io, data, context, entryNum);
489 }
490
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)491 void GenbankWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
492 QScopedPointer<U2SequenceObject> seqObj(nullptr);
493 U2SequenceObject *dna = nullptr;
494 QString annotationName;
495
496 if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
497 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
498 seqObj.reset(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
499 SAFE_POINT(nullptr != seqObj.data(), tr("Genbank writer: NULL sequence object"), );
500
501 U2OpStatusImpl os;
502 DNASequence seq = seqObj->getWholeSequence(os);
503 SAFE_POINT_OP(os, );
504 QMapIterator<QString, QVariant> it(seq.info);
505 while (it.hasNext()) {
506 it.next();
507 if (!(it.value().type() == QVariant::String || it.value().type() == QVariant::StringList)) {
508 seq.info.remove(it.key());
509 }
510 }
511
512 if (seq.getName().isEmpty()) {
513 int num = doc->findGObjectByType(GObjectTypes::SEQUENCE).size();
514 seq.setName(QString("unknown sequence %1").arg(num));
515 } else {
516 annotationName = getAnnotationName(seq.getName());
517 }
518
519 dna = qobject_cast<U2SequenceObject *>(doc->findGObjectByName(seq.getName()));
520 if (!dna && !seq.isNull()) {
521 dna = addSeqObject(doc, seq);
522 }
523 }
524
525 if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
526 const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
527 const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
528
529 if (!atl.isEmpty()) {
530 AnnotationTableObject *att = nullptr;
531 if (dna) {
532 QList<GObject *> relAnns = GObjectUtils::findObjectsRelatedToObjectByRole(dna,
533 GObjectTypes::ANNOTATION_TABLE,
534 ObjectRole_Sequence,
535 doc->getObjects(),
536 UOF_LoadedOnly);
537 att = relAnns.isEmpty() ? nullptr : qobject_cast<AnnotationTableObject *>(relAnns.first());
538 }
539 if (!att) {
540 if (annotationName.isEmpty()) {
541 int featuresNum = doc->findGObjectByType(GObjectTypes::ANNOTATION_TABLE).size();
542 annotationName = QString("unknown features %1").arg(featuresNum);
543 }
544 att = qobject_cast<AnnotationTableObject *>(doc->findGObjectByName(annotationName));
545 if (att == nullptr) {
546 doc->addObject(att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef()));
547 if (dna) {
548 att->addObjectRelation(dna, ObjectRole_Sequence);
549 }
550 }
551 algoLog.trace(QString("Adding features [%1] to GB doc %2").arg(annotationName).arg(doc->getURLString()));
552 }
553 att->addAnnotations(atl);
554 }
555 }
556 }
557
hasDataToWrite(const QVariantMap & data) const558 bool GenbankWriter::hasDataToWrite(const QVariantMap &data) const {
559 return SeqWriter::hasSequenceOrAnns(data);
560 }
561
getObjectsToWrite(const QVariantMap & data) const562 QSet<GObject *> GenbankWriter::getObjectsToWrite(const QVariantMap &data) const {
563 return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
564 }
565
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)566 void GenbankWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
567 U2OpStatus2Log os;
568 QScopedPointer<U2SequenceObject> seqObj(nullptr);
569 QString annotationName;
570 if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
571 seqObj.reset(getCopiedSequenceObject(data, context, os));
572 SAFE_POINT_OP(os, );
573
574 if (seqObj->getGObjectName().isEmpty()) {
575 seqObj->setGObjectName(QString("unknown sequence %1").arg(entryNum));
576 annotationName = QString("unknown features %1").arg(entryNum);
577 } else {
578 annotationName = getAnnotationName(seqObj->getGObjectName());
579 }
580 }
581
582 QList<GObject *> anObjList;
583 if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
584 const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
585 const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
586
587 if (!atl.isEmpty()) {
588 if (annotationName.isEmpty()) {
589 annotationName = QString("unknown features %1").arg(entryNum);
590 }
591 AnnotationTableObject *att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef());
592 anObjList << att;
593 att->addAnnotations(atl);
594 }
595 }
596
597 QMap<GObjectType, QList<GObject *>> objectsMap;
598 {
599 if (nullptr != seqObj.data()) {
600 QList<GObject *> seqs;
601 seqs << seqObj.data();
602 objectsMap[GObjectTypes::SEQUENCE] = seqs;
603 }
604 if (!anObjList.isEmpty()) {
605 objectsMap[GObjectTypes::ANNOTATION_TABLE] = anObjList;
606 }
607 }
608 CHECK(!objectsMap.isEmpty(), );
609
610 format->storeEntry(io, objectsMap, os);
611
612 foreach (GObject *o, anObjList) {
613 delete o;
614 }
615 }
616
617 /*************************************
618 * GFFWriter
619 *************************************/
data2doc(Document * doc,const QVariantMap & data)620 void GFFWriter::data2doc(Document *doc, const QVariantMap &data) {
621 data2document(doc, data, context);
622 }
623
hasDataToWrite(const QVariantMap & data) const624 bool GFFWriter::hasDataToWrite(const QVariantMap &data) const {
625 return SeqWriter::hasSequenceOrAnns(data);
626 }
627
getObjectsToWrite(const QVariantMap & data) const628 QSet<GObject *> GFFWriter::getObjectsToWrite(const QVariantMap &data) const {
629 return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
630 }
631
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)632 void GFFWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
633 QScopedPointer<U2SequenceObject> seqObj(nullptr);
634 U2SequenceObject *dna = nullptr;
635 QString annotationName;
636 if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
637 SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
638 seqObj.reset(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
639 SAFE_POINT(!seqObj.isNull(), tr("GFF writer: NULL sequence object"), );
640
641 U2OpStatusImpl os;
642 DNASequence seq = seqObj->getWholeSequence(os);
643 SAFE_POINT_OP(os, );
644 if (seq.getName().isEmpty()) {
645 int num = doc->findGObjectByType(GObjectTypes::SEQUENCE).size();
646 seq.setName(QString("unknown sequence %1").arg(num));
647 } else {
648 annotationName = getAnnotationName(seq.getName());
649 }
650
651 dna = qobject_cast<U2SequenceObject *>(doc->findGObjectByName(seq.getName()));
652 if (!dna && !seq.isNull()) {
653 dna = addSeqObject(doc, seq);
654 }
655 }
656
657 if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
658 const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
659 const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
660
661 if (!atl.isEmpty()) {
662 AnnotationTableObject *att = nullptr;
663 if (nullptr != dna) {
664 QList<GObject *> relAnns = GObjectUtils::findObjectsRelatedToObjectByRole(dna, GObjectTypes::ANNOTATION_TABLE, ObjectRole_Sequence, doc->getObjects(), UOF_LoadedOnly);
665 att = relAnns.isEmpty() ? nullptr : qobject_cast<AnnotationTableObject *>(relAnns.first());
666 }
667 if (nullptr == att) {
668 if (annotationName.isEmpty()) {
669 int featuresNum = doc->findGObjectByType(GObjectTypes::ANNOTATION_TABLE).size();
670 annotationName = QString("unknown features %1").arg(featuresNum);
671 }
672 att = qobject_cast<AnnotationTableObject *>(doc->findGObjectByName(annotationName));
673 if (nullptr == att) {
674 doc->addObject(att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef()));
675 if (nullptr != dna) {
676 att->addObjectRelation(dna, ObjectRole_Sequence);
677 }
678 }
679 algoLog.trace(QString("Adding features [%1] to GFF doc %2").arg(annotationName).arg(doc->getURLString()));
680 }
681 att->addAnnotations(atl);
682 }
683 }
684 }
685
686 /*************************************
687 * SeqWriter
688 *************************************/
SeqWriter(Actor * a)689 SeqWriter::SeqWriter(Actor *a)
690 : BaseDocWriter(a), numSplitSequences(1), currentSplitSequence(0) {
691 }
692
SeqWriter(Actor * a,const DocumentFormatId & fid)693 SeqWriter::SeqWriter(Actor *a, const DocumentFormatId &fid)
694 : BaseDocWriter(a, fid), numSplitSequences(1), currentSplitSequence(0) {
695 }
696
data2doc(Document * doc,const QVariantMap & data)697 void SeqWriter::data2doc(Document *doc, const QVariantMap &data) {
698 if (nullptr == format) {
699 return;
700 }
701 DocumentFormatId fid = format->getFormatId();
702 if (BaseDocumentFormats::FASTA == fid) {
703 FastaWriter::data2document(doc, data, context, numSplitSequences, currentSplitSequence);
704 currentSplitSequence++;
705 } else if (BaseDocumentFormats::PLAIN_GENBANK == fid) {
706 GenbankWriter::data2document(doc, data, context);
707 } else if (BaseDocumentFormats::FASTQ == fid) {
708 FastQWriter::data2document(doc, data, context);
709 } else if (BaseDocumentFormats::RAW_DNA_SEQUENCE == fid) {
710 RawSeqWriter::data2document(doc, data, context);
711 } else if (BaseDocumentFormats::GFF == fid) {
712 GFFWriter::data2document(doc, data, context);
713 } else {
714 assert(0);
715 ioLog.error(QString("Unknown data format for writing: %1").arg(fid));
716 }
717 }
718
hasSequence(const QVariantMap & data)719 bool SeqWriter::hasSequence(const QVariantMap &data) {
720 return data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId());
721 }
722
hasSequenceOrAnns(const QVariantMap & data)723 bool SeqWriter::hasSequenceOrAnns(const QVariantMap &data) {
724 return data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()) || data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId());
725 }
726
hasDataToWrite(const QVariantMap & data) const727 bool SeqWriter::hasDataToWrite(const QVariantMap &data) const {
728 if (nullptr != format) {
729 DocumentFormatId fid = format->getFormatId();
730 if (BaseDocumentFormats::GFF == fid || BaseDocumentFormats::PLAIN_GENBANK == fid) {
731 return hasSequenceOrAnns(data);
732 } else {
733 return hasSequence(data);
734 }
735 } else if (dstDbiRef.isValid()) {
736 return hasSequenceOrAnns(data);
737 } else {
738 return false;
739 }
740 }
741
getSeqObject(const QVariantMap & data,WorkflowContext * context)742 GObject *SeqWriter::getSeqObject(const QVariantMap &data, WorkflowContext *context) {
743 SharedDbiDataHandler objId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
744 return StorageUtils::getSequenceObject(context->getDataStorage(), objId);
745 }
746
getAnnObject(const QVariantMap & data,WorkflowContext * context)747 GObject *SeqWriter::getAnnObject(const QVariantMap &data, WorkflowContext *context) {
748 const QList<SharedAnnotationData> anns = StorageUtils::getAnnotationTable(context->getDataStorage(), data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()]);
749 CHECK(!anns.isEmpty(), nullptr);
750 QScopedPointer<U2SequenceObject> seqObj(qobject_cast<U2SequenceObject *>(getSeqObject(data, context)));
751 QString seqName = "Unknown";
752 if (!seqObj.isNull()) {
753 seqName = seqObj->getSequenceName();
754 }
755 AnnotationTableObject *annObj = new AnnotationTableObject(seqName + " features", context->getDataStorage()->getDbiRef());
756 annObj->addAnnotations(anns);
757 return annObj;
758 }
759
getObjectsToWrite(const QVariantMap & data) const760 QSet<GObject *> SeqWriter::getObjectsToWrite(const QVariantMap &data) const {
761 return QSet<GObject *>() << getSeqObject(data, context) << getAnnObject(data, context);
762 }
763
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)764 void SeqWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
765 if (format == nullptr) {
766 return;
767 }
768 DocumentFormatId fid = format->getFormatId();
769 if (fid == BaseDocumentFormats::FASTA) {
770 FastaWriter::streamingStoreEntry(format, io, data, context, entryNum);
771 currentSplitSequence++;
772 } else if (fid == BaseDocumentFormats::PLAIN_GENBANK) {
773 GenbankWriter::streamingStoreEntry(format, io, data, context, entryNum);
774 } else if (fid == BaseDocumentFormats::FASTQ) {
775 FastQWriter::streamingStoreEntry(format, io, data, context, entryNum);
776 } else if (fid == BaseDocumentFormats::RAW_DNA_SEQUENCE) {
777 RawSeqWriter::streamingStoreEntry(format, io, data, context, entryNum);
778 } else {
779 assert(0);
780 ioLog.error(QString("Unknown data format for writing: %1").arg(fid));
781 }
782 }
783
takeParameters(U2OpStatus & os)784 void SeqWriter::takeParameters(U2OpStatus &os) {
785 BaseDocWriter::takeParameters(os);
786 SAFE_POINT_OP(os, );
787
788 Attribute *splitAttr = actor->getParameter(BaseAttributes::SPLIT_SEQ_ATTRIBUTE().getId());
789 if (nullptr != format && format->getFormatId() == BaseDocumentFormats::FASTA && splitAttr != nullptr) {
790 numSplitSequences = splitAttr->getAttributeValue<int>(context);
791 } else {
792 numSplitSequences = 1;
793 }
794 }
795
takeUrlList(const QVariantMap & data,int metadataId,U2OpStatus & os)796 QStringList SeqWriter::takeUrlList(const QVariantMap &data, int metadataId, U2OpStatus &os) {
797 QStringList urls = BaseDocWriter::takeUrlList(data, metadataId, os);
798 CHECK_OP(os, urls);
799 SAFE_POINT(1 == urls.size(), "Several urls in the output attribute", urls);
800
801 SharedDbiDataHandler seqId = data.value(BaseSlots::DNA_SEQUENCE_SLOT().getId()).value<SharedDbiDataHandler>();
802 QSharedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
803
804 currentSplitSequence = 0;
805
806 if (!seqObj.data()) {
807 numSplitSequences = 1;
808 } else {
809 qint64 seqLen = seqObj.data()->getSequenceLength();
810 if (seqLen < numSplitSequences) {
811 numSplitSequences = seqLen;
812 }
813 if (0 == numSplitSequences) {
814 numSplitSequences = 1;
815 }
816 }
817
818 if (numSplitSequences > 1) {
819 QString url = urls.takeFirst();
820 for (int i = 0; i < numSplitSequences; i++) {
821 urls << GUrlUtils::insertSuffix(url, "_split" + QString::number(i + 1));
822 }
823 }
824
825 return urls;
826 }
827
isStreamingSupport() const828 bool SeqWriter::isStreamingSupport() const {
829 if (numSplitSequences > 1) {
830 return false;
831 }
832 return BaseDocWriter::isStreamingSupport();
833 }
834
835 /*************************************
836 * MSAWriter
837 *************************************/
data2doc(Document * doc,const QVariantMap & data)838 void MSAWriter::data2doc(Document *doc, const QVariantMap &data) {
839 data2document(doc, data, context);
840 }
841
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)842 void MSAWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
843 SharedDbiDataHandler msaId = data.value(BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId()).value<SharedDbiDataHandler>();
844 QScopedPointer<MultipleSequenceAlignmentObject> msaObj(StorageUtils::getMsaObject(context->getDataStorage(), msaId));
845 SAFE_POINT(!msaObj.isNull(), "NULL MSA Object!", );
846 MultipleSequenceAlignment msa = msaObj->getMsaCopy();
847
848 SAFE_POINT(!msa->isEmpty(), tr("Empty alignment passed for writing to %1").arg(doc->getURLString()), )
849
850 if (msa->getName().isEmpty()) {
851 QString name = QString(MA_OBJECT_NAME + "_%1").arg(ct);
852 msa->setName(name);
853 ct++;
854 }
855
856 U2OpStatus2Log os;
857 MultipleSequenceAlignmentObject *obj = MultipleSequenceAlignmentImporter::createAlignment(doc->getDbiRef(), msa, os);
858 CHECK_OP(os, );
859
860 doc->addObject(obj);
861 }
862
hasDataToWrite(const QVariantMap & data) const863 bool MSAWriter::hasDataToWrite(const QVariantMap &data) const {
864 return data.contains(BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId());
865 }
866
getObjectsToWrite(const QVariantMap & data) const867 QSet<GObject *> MSAWriter::getObjectsToWrite(const QVariantMap &data) const {
868 SharedDbiDataHandler objId = data[BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId()].value<SharedDbiDataHandler>();
869 return QSet<GObject *>() << StorageUtils::getMsaObject(context->getDataStorage(), objId);
870 }
871
isStreamingSupport() const872 bool MSAWriter::isStreamingSupport() const {
873 return false;
874 }
875
876 /*************************************
877 * DataWorkerFactory
878 *************************************/
createWorker(Actor * a)879 Worker *DataWorkerFactory::createWorker(Actor *a) {
880 // TODO: wtf is this??
881 // each actor must have own factory
882
883 BaseWorker *w = nullptr;
884 QString protoId = a->getProto()->getId();
885 if (CoreLibConstants::READ_TEXT_PROTO_ID == protoId) {
886 TextReader *t = new TextReader(a);
887 w = t;
888 } else if (CoreLibConstants::WRITE_TEXT_PROTO_ID == protoId) {
889 w = new TextWriter(a);
890 } else if (CoreLibConstants::WRITE_FASTA_PROTO_ID == protoId) {
891 w = new FastaWriter(a);
892 } else if (CoreLibConstants::WRITE_GENBANK_PROTO_ID == protoId) {
893 w = new GenbankWriter(a);
894 } else if (CoreLibConstants::WRITE_CLUSTAL_PROTO_ID == protoId) {
895 w = new MSAWriter(a, BaseDocumentFormats::CLUSTAL_ALN);
896 } else if (CoreLibConstants::WRITE_STOCKHOLM_PROTO_ID == protoId) {
897 w = new MSAWriter(a, BaseDocumentFormats::STOCKHOLM);
898 } else if (CoreLibConstants::GENERIC_READ_MA_PROTO_ID == protoId) {
899 w = new GenericMSAReader(a);
900 } else if (CoreLibConstants::GENERIC_READ_SEQ_PROTO_ID == protoId) {
901 w = new GenericSeqReader(a);
902 } else if (CoreLibConstants::WRITE_MSA_PROTO_ID == protoId) {
903 w = new MSAWriter(a);
904 } else if (CoreLibConstants::WRITE_SEQ_PROTO_ID == protoId) {
905 w = new SeqWriter(a);
906 } else if (CoreLibConstants::WRITE_FASTQ_PROTO_ID == protoId) {
907 w = new FastQWriter(a);
908 } else {
909 assert(0);
910 }
911 return w;
912 }
913
init()914 void DataWorkerFactory::init() {
915 DomainFactory *localDomain = WorkflowEnv::getDomainRegistry()->getById(LocalDomainFactory::ID);
916 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_FASTA_PROTO_ID));
917 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_GENBANK_PROTO_ID));
918 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::READ_TEXT_PROTO_ID));
919 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_TEXT_PROTO_ID));
920 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::GENERIC_READ_SEQ_PROTO_ID));
921 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::GENERIC_READ_MA_PROTO_ID));
922 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_CLUSTAL_PROTO_ID));
923 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_STOCKHOLM_PROTO_ID));
924 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_MSA_PROTO_ID));
925 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_SEQ_PROTO_ID));
926 localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_FASTQ_PROTO_ID));
927 }
928
929 } // namespace LocalWorkflow
930 } // namespace U2
931