1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "DocWorkers.h"
23 
24 #include <QScopedPointer>
25 
26 #include <U2Core/AnnotationData.h>
27 #include <U2Core/AnnotationTableObject.h>
28 #include <U2Core/AppContext.h>
29 #include <U2Core/DNASequence.h>
30 #include <U2Core/DocumentModel.h>
31 #include <U2Core/FailTask.h>
32 #include <U2Core/GObjectRelationRoles.h>
33 #include <U2Core/GObjectUtils.h>
34 #include <U2Core/GUrlUtils.h>
35 #include <U2Core/IOAdapter.h>
36 #include <U2Core/IOAdapterUtils.h>
37 #include <U2Core/Log.h>
38 #include <U2Core/MultipleSequenceAlignmentImporter.h>
39 #include <U2Core/MultipleSequenceAlignmentObject.h>
40 #include <U2Core/TextObject.h>
41 #include <U2Core/U2AttributeUtils.h>
42 #include <U2Core/U2OpStatusUtils.h>
43 #include <U2Core/U2SafePoints.h>
44 #include <U2Core/U2SequenceUtils.h>
45 
46 #include <U2Lang/BaseAttributes.h>
47 #include <U2Lang/BaseSlots.h>
48 #include <U2Lang/CoreLibConstants.h>
49 #include <U2Lang/Dataset.h>
50 #include <U2Lang/SharedDbUrlUtils.h>
51 #include <U2Lang/WorkflowEnv.h>
52 #include <U2Lang/WorkflowUtils.h>
53 
54 #include "GenericReadWorker.h"
55 
56 namespace U2 {
57 namespace LocalWorkflow {
58 
59 static int ct = 0;
60 
61 const int TextReader::MAX_LINE_LEN = 1024;
62 const int TextReader::READ_BLOCK_SIZE = 1024;
63 
64 /*************************************
65  * TextReader
66  *************************************/
TextReader(Actor * a)67 TextReader::TextReader(Actor *a)
68     : BaseWorker(a), ch(nullptr), io(nullptr), urls(nullptr) {
69     mtype = WorkflowEnv::getDataTypeRegistry()->getById(CoreLibConstants::TEXT_TYPESET_ID);
70 }
71 
init()72 void TextReader::init() {
73     QList<Dataset> sets = actor->getParameter(BaseAttributes::URL_IN_ATTRIBUTE().getId())->getAttributeValue<QList<Dataset>>(context);
74     urls = new DatasetFilesIterator(sets);
75 
76     assert(ports.size() == 1);
77     ch = ports.values().first();
78 }
79 
cleanup()80 void TextReader::cleanup() {
81     delete io;
82     delete urls;
83 }
84 
sendMessage(const QByteArray & data)85 void TextReader::sendMessage(const QByteArray &data) {
86     QVariantMap m;
87     m[BaseSlots::TEXT_SLOT().getId()] = QString(data);
88     m[BaseSlots::URL_SLOT().getId()] = url;
89     m[BaseSlots::DATASET_SLOT().getId()] = urls->getLastDatasetName();
90     MessageMetadata metadata(url, urls->getLastDatasetName());
91     context->getMetadataStorage().put(metadata);
92     ch->put(Message(mtype, m, metadata.getId()));
93 }
94 
tick()95 Task *TextReader::tick() {
96     if (nullptr != io && io->isOpen()) {
97         processNextLine();
98     } else if (urls->hasNext()) {
99         url = urls->getNextFile();
100         Task *resultTask = processUrlEntity(url);
101         if (nullptr != resultTask) {
102             return resultTask;
103         }
104     }
105     if (!urls->hasNext() && (nullptr == io || !io->isOpen())) {
106         ch->setEnded();
107         setDone();
108     }
109     return nullptr;
110 }
111 
processUrlEntity(const QString & url)112 Task *TextReader::processUrlEntity(const QString &url) {
113     return SharedDbUrlUtils::isDbObjectUrl(url) ? processDbObject(url) : processFile(url);
114 }
115 
processDbObject(const QString & url)116 Task *TextReader::processDbObject(const QString &url) {
117     const U2DataId objDbId = SharedDbUrlUtils::getObjectIdByUrl(url);
118     CHECK(!objDbId.isEmpty(), createDbObjectReadFailTask(url));
119     const U2DbiRef dbRef = SharedDbUrlUtils::getDbRefFromEntityUrl(url);
120     CHECK(dbRef.isValid(), createDbObjectReadFailTask(url));
121     const QString objDbName = SharedDbUrlUtils::getDbObjectNameByUrl(url);
122     CHECK(!objDbName.isEmpty(), createDbObjectReadFailTask(url));
123 
124     QScopedPointer<TextObject> obj(qobject_cast<TextObject *>(GObjectUtils::createObject(dbRef, objDbId, objDbName)));
125     CHECK(!obj.isNull(), createDbObjectReadFailTask(url));
126     sendMessage(obj->getText().toLocal8Bit());
127 
128     return nullptr;
129 }
130 
createDbObjectReadFailTask(const QString & url)131 Task *TextReader::createDbObjectReadFailTask(const QString &url) {
132     const QString objName = SharedDbUrlUtils::getDbObjectNameByUrl(url);
133     const QString dbShortName = SharedDbUrlUtils::getDbShortNameFromEntityUrl(url);
134     return new FailTask(tr("Can't load the object %1 from the database %2").arg(objName).arg(dbShortName));
135 }
136 
processFile(const QString & url)137 Task *TextReader::processFile(const QString &url) {
138     IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(IOAdapterUtils::url2io(url));
139     io = iof->createIOAdapter();
140     if (!io->open(url, IOAdapterMode_Read)) {
141         return new FailTask(tr("Can't load file %1").arg(url));
142     }
143     if (actor->getParameter(BaseAttributes::READ_BY_LINES_ATTRIBUTE().getId())->getAttributeValue<bool>(context) == false) {
144         QByteArray buf;
145         int read = 0;
146         int offs = 0;
147         buf.resize(READ_BLOCK_SIZE);
148         buf.fill(0);
149         do {
150             read = io->readBlock(buf.data() + offs, READ_BLOCK_SIZE);
151             if (read == -1) {
152                 return new FailTask(tr("Can't load file %1. %2").arg(url).arg(io->errorString()));
153             }
154             if (read != READ_BLOCK_SIZE) {
155                 SAFE_POINT(read < READ_BLOCK_SIZE, "Error while reading file", nullptr);
156                 buf.resize(buf.size() - READ_BLOCK_SIZE + read);
157                 break;
158             }
159             offs += read;
160             buf.resize(offs + READ_BLOCK_SIZE);
161         } while (read == READ_BLOCK_SIZE);
162 
163         sendMessage(buf);
164         io->close();
165     } else {
166         processNextLine();
167     }
168     return nullptr;
169 }
170 
processNextLine()171 void TextReader::processNextLine() {
172     QByteArray buf;
173     buf.resize(MAX_LINE_LEN);
174     buf.fill(0);
175     int read = io->readLine(buf.data(), MAX_LINE_LEN);
176     buf.resize(read);
177     sendMessage(buf);
178     if (io->isEof()) {
179         io->close();
180     }
181 }
182 
183 /*************************************
184  * TextWriter
185  *************************************/
data2doc(Document * doc,const QVariantMap & data)186 void TextWriter::data2doc(Document *doc, const QVariantMap &data) {
187     QStringList list = data.value(BaseSlots::TEXT_SLOT().getId()).toStringList();
188     QString text = list.join("\n");
189     TextObject *to = qobject_cast<TextObject *>(GObjectUtils::selectOne(doc->getObjects(), GObjectTypes::TEXT, UOF_LoadedOnly));
190     if (!to) {
191         U2OpStatus2Log os;
192         to = TextObject::createInstance(text, QString("Text %1").arg(++ct), context->getDataStorage()->getDbiRef(), os);
193         CHECK_OP(os, );
194         doc->addObject(to);
195     } else {
196         to->setText(to->getText() + "\n" + text);
197     }
198 }
199 
hasDataToWrite(const QVariantMap & data) const200 bool TextWriter::hasDataToWrite(const QVariantMap &data) const {
201     return data.contains(BaseSlots::TEXT_SLOT().getId());
202 }
203 
getObjectsToWrite(const QVariantMap & data) const204 QSet<GObject *> TextWriter::getObjectsToWrite(const QVariantMap &data) const {
205     const QStringList text = data[BaseSlots::TEXT_SLOT().getId()].value<QStringList>();
206 
207     U2OpStatusImpl os;
208     GObject *res = TextObject::createInstance(text.join("\n"), "Text", context->getDataStorage()->getDbiRef(), os);
209     SAFE_POINT_OP(os, QSet<GObject *>());
210     return QSet<GObject *>() << res;
211 }
212 
isStreamingSupport() const213 bool TextWriter::isStreamingSupport() const {
214     return false;
215 }
216 
isSupportedSeveralMessages() const217 bool TextWriter::isSupportedSeveralMessages() const {
218     return true;
219 }
220 
221 /**
222  * It can change sequence name for setting unique object name
223  */
addSeqObject(Document * doc,DNASequence & seq)224 static U2SequenceObject *addSeqObject(Document *doc, DNASequence &seq) {
225     SAFE_POINT(nullptr != seq.alphabet, "Add sequence to document: empty alphabet", nullptr);
226     SAFE_POINT(0 != seq.length(), "Add sequence to document: empty length", nullptr);
227 
228     if (doc->findGObjectByName(seq.getName())) {
229         QString uniqueName = BaseDocWriter::getUniqueObjectName(doc, seq.getName());
230         seq.setName(uniqueName);
231     }
232     algoLog.trace(QString("Adding seq [%1] to %3 doc %2").arg(seq.getName()).arg(doc->getURLString()).arg(doc->getDocumentFormat()->getFormatName()));
233 
234     U2SequenceObject *dna = nullptr;
235     if (doc->getDocumentFormat()->isObjectOpSupported(doc, DocumentFormat::DocObjectOp_Add, GObjectTypes::SEQUENCE)) {
236         U2OpStatus2Log os;
237         U2EntityRef seqRef = U2SequenceUtils::import(os, doc->getDbiRef(), seq);
238         CHECK_OP(os, nullptr);
239         dna = new U2SequenceObject(seq.getName(), seqRef);
240         doc->addObject(dna);
241     } else {
242         algoLog.trace("Failed to add sequence object to document: op is not supported!");
243     }
244 
245     return dna;
246 }
247 
248 /*************************************
249  * FastaWriter
250  *************************************/
data2doc(Document * doc,const QVariantMap & data)251 void FastaWriter::data2doc(Document *doc, const QVariantMap &data) {
252     data2document(doc, data, context, numSplitSequences, currentSplitSequence);
253     currentSplitSequence++;
254 }
255 
hasDataToWrite(const QVariantMap & data) const256 bool FastaWriter::hasDataToWrite(const QVariantMap &data) const {
257     return SeqWriter::hasSequence(data);
258 }
259 
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)260 void FastaWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
261     streamingStoreEntry(format, io, data, context, entryNum);
262     currentSplitSequence++;
263 }
264 
getSplitRegion(int numSplitSequences,int currentSplitSequence,qint64 seqLen)265 U2Region FastaWriter::getSplitRegion(int numSplitSequences, int currentSplitSequence, qint64 seqLen) {
266     U2Region result;
267     result.startPos = currentSplitSequence * (seqLen / numSplitSequences);
268     result.length = seqLen / numSplitSequences;
269     if (currentSplitSequence == (numSplitSequences - 1)) {
270         result.length += seqLen % numSplitSequences;
271     }
272     return result;
273 }
274 
getSeqObj(const QVariantMap & data,WorkflowContext * context,U2OpStatus & os)275 static U2SequenceObject *getSeqObj(const QVariantMap &data, WorkflowContext *context, U2OpStatus &os) {
276     if (!data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
277         os.setError("Fasta writer: no sequence");
278         return nullptr;
279     }
280     SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
281     U2SequenceObject *seqObj = StorageUtils::getSequenceObject(context->getDataStorage(), seqId);
282     if (nullptr == seqObj) {
283         os.setError("Fasta writer: NULL sequence object");
284     }
285     return seqObj;
286 }
287 
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context,int numSplitSequences,int currentSplitSequence)288 void FastaWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context, int numSplitSequences, int currentSplitSequence) {
289     U2OpStatusImpl os;
290     QScopedPointer<U2SequenceObject> seqObj(getSeqObj(data, context, os));
291     SAFE_POINT_OP(os, );
292 
293     U2Region splitRegion = getSplitRegion(numSplitSequences, currentSplitSequence, seqObj->getSequenceLength());
294     QByteArray splitSequence = seqObj->getSequenceData(splitRegion, os);
295     CHECK_OP(os, );
296 
297     DNASequence seq(seqObj->getSequenceName() + ((numSplitSequences == 1) ? QString("%1..%2").arg(splitRegion.startPos + 1, splitRegion.length) : ""), splitSequence, seqObj->getAlphabet());
298     seq.circular = seqObj->isCircular();
299     seq.quality = seqObj->getQuality();
300     seq.info = seqObj->getSequenceInfo();
301 
302     QString sequenceName = data.value(BaseSlots::FASTA_HEADER_SLOT().getId()).toString();
303     if (sequenceName.isEmpty()) {
304         sequenceName = seq.getName();
305         if (sequenceName.isEmpty()) {
306             sequenceName = QString("unknown sequence %1").arg(doc->getObjects().size());
307         }
308     } else {
309         seq.info.insert(DNAInfo::FASTA_HDR, sequenceName);
310     }
311     seq.setName(sequenceName);
312     addSeqObject(doc, seq);
313 }
314 
getCopiedSequenceObject(const QVariantMap & data,WorkflowContext * context,U2OpStatus2Log & os,const U2Region & reg=U2_REGION_MAX)315 inline static U2SequenceObject *getCopiedSequenceObject(const QVariantMap &data, WorkflowContext *context, U2OpStatus2Log &os, const U2Region &reg = U2_REGION_MAX) {
316     QScopedPointer<U2SequenceObject> seqObj(getSeqObj(data, context, os));
317     SAFE_POINT_OP(os, nullptr);
318 
319     SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
320     int refCount = seqId.constData()->getReferenceCount();
321     if (refCount > 2) {  // need to copy because it is used by another worker
322         DNASequence seq = seqObj->getSequence(reg, os);
323         CHECK_OP(os, nullptr);
324         U2EntityRef seqRef = U2SequenceUtils::import(os, context->getDataStorage()->getDbiRef(), seq);
325         CHECK_OP(os, nullptr);
326 
327         U2SequenceObject *clonedSeqObj = new U2SequenceObject(seqObj->getSequenceName(), seqRef);
328         U2AttributeUtils::copyObjectAttributes(seqObj->getEntityRef(), clonedSeqObj->getEntityRef(), os);
329 
330         return clonedSeqObj;
331     } else {
332         return seqObj.take();
333     }
334 }
335 
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)336 void FastaWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
337     CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
338     U2OpStatus2Log os;
339     QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
340     SAFE_POINT_OP(os, );
341 
342     QString sequenceName = data.value(BaseSlots::FASTA_HEADER_SLOT().getId(), QString()).toString();
343     if (sequenceName.isEmpty()) {
344         sequenceName = seqObj->getGObjectName();
345         if (sequenceName.isEmpty()) {
346             sequenceName = QString("unknown sequence %1").arg(entryNum);
347         }
348     } else {
349         QVariantMap info = seqObj->getSequenceInfo();
350         info.insert(DNAInfo::FASTA_HDR, sequenceName);
351         seqObj->setSequenceInfo(info);
352     }
353     seqObj->setGObjectName(sequenceName);
354 
355     QMap<GObjectType, QList<GObject *>> objectsMap;
356     {
357         QList<GObject *> seqs;
358         seqs << seqObj.data();
359         objectsMap[GObjectTypes::SEQUENCE] = seqs;
360     }
361     format->storeEntry(io, objectsMap, os);
362 }
363 
364 /*************************************
365  * FastQWriter
366  *************************************/
data2doc(Document * doc,const QVariantMap & data)367 void FastQWriter::data2doc(Document *doc, const QVariantMap &data) {
368     data2document(doc, data, context);
369 }
370 
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)371 void FastQWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
372     streamingStoreEntry(format, io, data, context, entryNum);
373 }
374 
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)375 void FastQWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
376     CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
377     SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
378     QScopedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
379     SAFE_POINT(nullptr != seqObj.data(), tr("Fastq writer: NULL sequence object"), );
380 
381     U2OpStatusImpl os;
382     DNASequence seq = seqObj->getWholeSequence(os);
383     SAFE_POINT_OP(os, );
384 
385     if (seq.getName().isEmpty()) {
386         seq.setName(QString("unknown sequence %1").arg(doc->getObjects().size()));
387     }
388     addSeqObject(doc, seq);
389 }
390 
hasDataToWrite(const QVariantMap & data) const391 bool FastQWriter::hasDataToWrite(const QVariantMap &data) const {
392     return SeqWriter::hasSequence(data);
393 }
394 
getObjectsToWrite(const QVariantMap & data) const395 QSet<GObject *> FastQWriter::getObjectsToWrite(const QVariantMap &data) const {
396     return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
397 }
398 
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)399 void FastQWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
400     CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
401     U2OpStatus2Log os;
402     QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
403     SAFE_POINT_OP(os, );
404 
405     if (seqObj->getGObjectName().isEmpty()) {
406         seqObj->setGObjectName(QString("unknown sequence %1").arg(entryNum));
407     }
408 
409     QMap<GObjectType, QList<GObject *>> objectsMap;
410     {
411         QList<GObject *> seqs;
412         seqs << seqObj.data();
413         objectsMap[GObjectTypes::SEQUENCE] = seqs;
414     }
415     format->storeEntry(io, objectsMap, os);
416 }
417 
418 /*************************************
419  * RawSeqWriter
420  *************************************/
data2doc(Document * doc,const QVariantMap & data)421 void RawSeqWriter::data2doc(Document *doc, const QVariantMap &data) {
422     data2document(doc, data, context);
423 }
424 
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)425 void RawSeqWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
426     streamingStoreEntry(format, io, data, context, entryNum);
427 }
428 
429 // same as FastQWriter::data2document
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)430 void RawSeqWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
431     CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
432     SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
433     QScopedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
434     SAFE_POINT(nullptr != seqObj.data(), tr("Raw sequence writer: NULL sequence object"), );
435 
436     U2OpStatusImpl os;
437     DNASequence seq = seqObj->getWholeSequence(os);
438     SAFE_POINT_OP(os, );
439 
440     if (seq.getName().isEmpty()) {
441         seq.setName(QString("unknown sequence %1").arg(doc->getObjects().size()));
442     }
443     addSeqObject(doc, seq);
444 }
445 
hasDataToWrite(const QVariantMap & data) const446 bool RawSeqWriter::hasDataToWrite(const QVariantMap &data) const {
447     return SeqWriter::hasSequence(data);
448 }
449 
getObjectToWrite(const QVariantMap & data) const450 GObject *RawSeqWriter::getObjectToWrite(const QVariantMap &data) const {
451     return SeqWriter::getSeqObject(data, context);
452 }
453 
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int)454 void RawSeqWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int) {
455     CHECK(data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()), );
456     U2OpStatus2Log os;
457     QScopedPointer<U2SequenceObject> seqObj(getCopiedSequenceObject(data, context, os));
458     SAFE_POINT_OP(os, );
459 
460     QMap<GObjectType, QList<GObject *>> objectsMap;
461     {
462         QList<GObject *> seqs;
463         seqs << seqObj.data();
464         objectsMap[GObjectTypes::SEQUENCE] = seqs;
465     }
466     format->storeEntry(io, objectsMap, os);
467 }
468 
469 /*************************************
470  * GenbankWriter
471  *************************************/
getAnnotationName(const QString & seqName)472 inline static QString getAnnotationName(const QString &seqName) {
473     QString result = seqName;
474     if (result.contains(SEQUENCE_TAG)) {
475         result.replace(SEQUENCE_TAG, FEATURES_TAG);
476     } else {
477         result += FEATURES_TAG;
478     }
479 
480     return result;
481 }
482 
data2doc(Document * doc,const QVariantMap & data)483 void GenbankWriter::data2doc(Document *doc, const QVariantMap &data) {
484     data2document(doc, data, context);
485 }
486 
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)487 void GenbankWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
488     streamingStoreEntry(format, io, data, context, entryNum);
489 }
490 
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)491 void GenbankWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
492     QScopedPointer<U2SequenceObject> seqObj(nullptr);
493     U2SequenceObject *dna = nullptr;
494     QString annotationName;
495 
496     if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
497         SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
498         seqObj.reset(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
499         SAFE_POINT(nullptr != seqObj.data(), tr("Genbank writer: NULL sequence object"), );
500 
501         U2OpStatusImpl os;
502         DNASequence seq = seqObj->getWholeSequence(os);
503         SAFE_POINT_OP(os, );
504         QMapIterator<QString, QVariant> it(seq.info);
505         while (it.hasNext()) {
506             it.next();
507             if (!(it.value().type() == QVariant::String || it.value().type() == QVariant::StringList)) {
508                 seq.info.remove(it.key());
509             }
510         }
511 
512         if (seq.getName().isEmpty()) {
513             int num = doc->findGObjectByType(GObjectTypes::SEQUENCE).size();
514             seq.setName(QString("unknown sequence %1").arg(num));
515         } else {
516             annotationName = getAnnotationName(seq.getName());
517         }
518 
519         dna = qobject_cast<U2SequenceObject *>(doc->findGObjectByName(seq.getName()));
520         if (!dna && !seq.isNull()) {
521             dna = addSeqObject(doc, seq);
522         }
523     }
524 
525     if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
526         const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
527         const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
528 
529         if (!atl.isEmpty()) {
530             AnnotationTableObject *att = nullptr;
531             if (dna) {
532                 QList<GObject *> relAnns = GObjectUtils::findObjectsRelatedToObjectByRole(dna,
533                                                                                           GObjectTypes::ANNOTATION_TABLE,
534                                                                                           ObjectRole_Sequence,
535                                                                                           doc->getObjects(),
536                                                                                           UOF_LoadedOnly);
537                 att = relAnns.isEmpty() ? nullptr : qobject_cast<AnnotationTableObject *>(relAnns.first());
538             }
539             if (!att) {
540                 if (annotationName.isEmpty()) {
541                     int featuresNum = doc->findGObjectByType(GObjectTypes::ANNOTATION_TABLE).size();
542                     annotationName = QString("unknown features %1").arg(featuresNum);
543                 }
544                 att = qobject_cast<AnnotationTableObject *>(doc->findGObjectByName(annotationName));
545                 if (att == nullptr) {
546                     doc->addObject(att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef()));
547                     if (dna) {
548                         att->addObjectRelation(dna, ObjectRole_Sequence);
549                     }
550                 }
551                 algoLog.trace(QString("Adding features [%1] to GB doc %2").arg(annotationName).arg(doc->getURLString()));
552             }
553             att->addAnnotations(atl);
554         }
555     }
556 }
557 
hasDataToWrite(const QVariantMap & data) const558 bool GenbankWriter::hasDataToWrite(const QVariantMap &data) const {
559     return SeqWriter::hasSequenceOrAnns(data);
560 }
561 
getObjectsToWrite(const QVariantMap & data) const562 QSet<GObject *> GenbankWriter::getObjectsToWrite(const QVariantMap &data) const {
563     return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
564 }
565 
streamingStoreEntry(DocumentFormat * format,IOAdapter * io,const QVariantMap & data,WorkflowContext * context,int entryNum)566 void GenbankWriter::streamingStoreEntry(DocumentFormat *format, IOAdapter *io, const QVariantMap &data, WorkflowContext *context, int entryNum) {
567     U2OpStatus2Log os;
568     QScopedPointer<U2SequenceObject> seqObj(nullptr);
569     QString annotationName;
570     if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
571         seqObj.reset(getCopiedSequenceObject(data, context, os));
572         SAFE_POINT_OP(os, );
573 
574         if (seqObj->getGObjectName().isEmpty()) {
575             seqObj->setGObjectName(QString("unknown sequence %1").arg(entryNum));
576             annotationName = QString("unknown features %1").arg(entryNum);
577         } else {
578             annotationName = getAnnotationName(seqObj->getGObjectName());
579         }
580     }
581 
582     QList<GObject *> anObjList;
583     if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
584         const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
585         const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
586 
587         if (!atl.isEmpty()) {
588             if (annotationName.isEmpty()) {
589                 annotationName = QString("unknown features %1").arg(entryNum);
590             }
591             AnnotationTableObject *att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef());
592             anObjList << att;
593             att->addAnnotations(atl);
594         }
595     }
596 
597     QMap<GObjectType, QList<GObject *>> objectsMap;
598     {
599         if (nullptr != seqObj.data()) {
600             QList<GObject *> seqs;
601             seqs << seqObj.data();
602             objectsMap[GObjectTypes::SEQUENCE] = seqs;
603         }
604         if (!anObjList.isEmpty()) {
605             objectsMap[GObjectTypes::ANNOTATION_TABLE] = anObjList;
606         }
607     }
608     CHECK(!objectsMap.isEmpty(), );
609 
610     format->storeEntry(io, objectsMap, os);
611 
612     foreach (GObject *o, anObjList) {
613         delete o;
614     }
615 }
616 
617 /*************************************
618  * GFFWriter
619  *************************************/
data2doc(Document * doc,const QVariantMap & data)620 void GFFWriter::data2doc(Document *doc, const QVariantMap &data) {
621     data2document(doc, data, context);
622 }
623 
hasDataToWrite(const QVariantMap & data) const624 bool GFFWriter::hasDataToWrite(const QVariantMap &data) const {
625     return SeqWriter::hasSequenceOrAnns(data);
626 }
627 
getObjectsToWrite(const QVariantMap & data) const628 QSet<GObject *> GFFWriter::getObjectsToWrite(const QVariantMap &data) const {
629     return QSet<GObject *>() << SeqWriter::getSeqObject(data, context) << SeqWriter::getAnnObject(data, context);
630 }
631 
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)632 void GFFWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
633     QScopedPointer<U2SequenceObject> seqObj(nullptr);
634     U2SequenceObject *dna = nullptr;
635     QString annotationName;
636     if (data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId())) {
637         SharedDbiDataHandler seqId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
638         seqObj.reset(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
639         SAFE_POINT(!seqObj.isNull(), tr("GFF writer: NULL sequence object"), );
640 
641         U2OpStatusImpl os;
642         DNASequence seq = seqObj->getWholeSequence(os);
643         SAFE_POINT_OP(os, );
644         if (seq.getName().isEmpty()) {
645             int num = doc->findGObjectByType(GObjectTypes::SEQUENCE).size();
646             seq.setName(QString("unknown sequence %1").arg(num));
647         } else {
648             annotationName = getAnnotationName(seq.getName());
649         }
650 
651         dna = qobject_cast<U2SequenceObject *>(doc->findGObjectByName(seq.getName()));
652         if (!dna && !seq.isNull()) {
653             dna = addSeqObject(doc, seq);
654         }
655     }
656 
657     if (data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId())) {
658         const QVariant &annsVar = data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()];
659         const QList<SharedAnnotationData> atl = StorageUtils::getAnnotationTable(context->getDataStorage(), annsVar);
660 
661         if (!atl.isEmpty()) {
662             AnnotationTableObject *att = nullptr;
663             if (nullptr != dna) {
664                 QList<GObject *> relAnns = GObjectUtils::findObjectsRelatedToObjectByRole(dna, GObjectTypes::ANNOTATION_TABLE, ObjectRole_Sequence, doc->getObjects(), UOF_LoadedOnly);
665                 att = relAnns.isEmpty() ? nullptr : qobject_cast<AnnotationTableObject *>(relAnns.first());
666             }
667             if (nullptr == att) {
668                 if (annotationName.isEmpty()) {
669                     int featuresNum = doc->findGObjectByType(GObjectTypes::ANNOTATION_TABLE).size();
670                     annotationName = QString("unknown features %1").arg(featuresNum);
671                 }
672                 att = qobject_cast<AnnotationTableObject *>(doc->findGObjectByName(annotationName));
673                 if (nullptr == att) {
674                     doc->addObject(att = new AnnotationTableObject(annotationName, context->getDataStorage()->getDbiRef()));
675                     if (nullptr != dna) {
676                         att->addObjectRelation(dna, ObjectRole_Sequence);
677                     }
678                 }
679                 algoLog.trace(QString("Adding features [%1] to GFF doc %2").arg(annotationName).arg(doc->getURLString()));
680             }
681             att->addAnnotations(atl);
682         }
683     }
684 }
685 
686 /*************************************
687  * SeqWriter
688  *************************************/
SeqWriter(Actor * a)689 SeqWriter::SeqWriter(Actor *a)
690     : BaseDocWriter(a), numSplitSequences(1), currentSplitSequence(0) {
691 }
692 
SeqWriter(Actor * a,const DocumentFormatId & fid)693 SeqWriter::SeqWriter(Actor *a, const DocumentFormatId &fid)
694     : BaseDocWriter(a, fid), numSplitSequences(1), currentSplitSequence(0) {
695 }
696 
data2doc(Document * doc,const QVariantMap & data)697 void SeqWriter::data2doc(Document *doc, const QVariantMap &data) {
698     if (nullptr == format) {
699         return;
700     }
701     DocumentFormatId fid = format->getFormatId();
702     if (BaseDocumentFormats::FASTA == fid) {
703         FastaWriter::data2document(doc, data, context, numSplitSequences, currentSplitSequence);
704         currentSplitSequence++;
705     } else if (BaseDocumentFormats::PLAIN_GENBANK == fid) {
706         GenbankWriter::data2document(doc, data, context);
707     } else if (BaseDocumentFormats::FASTQ == fid) {
708         FastQWriter::data2document(doc, data, context);
709     } else if (BaseDocumentFormats::RAW_DNA_SEQUENCE == fid) {
710         RawSeqWriter::data2document(doc, data, context);
711     } else if (BaseDocumentFormats::GFF == fid) {
712         GFFWriter::data2document(doc, data, context);
713     } else {
714         assert(0);
715         ioLog.error(QString("Unknown data format for writing: %1").arg(fid));
716     }
717 }
718 
hasSequence(const QVariantMap & data)719 bool SeqWriter::hasSequence(const QVariantMap &data) {
720     return data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId());
721 }
722 
hasSequenceOrAnns(const QVariantMap & data)723 bool SeqWriter::hasSequenceOrAnns(const QVariantMap &data) {
724     return data.contains(BaseSlots::DNA_SEQUENCE_SLOT().getId()) || data.contains(BaseSlots::ANNOTATION_TABLE_SLOT().getId());
725 }
726 
hasDataToWrite(const QVariantMap & data) const727 bool SeqWriter::hasDataToWrite(const QVariantMap &data) const {
728     if (nullptr != format) {
729         DocumentFormatId fid = format->getFormatId();
730         if (BaseDocumentFormats::GFF == fid || BaseDocumentFormats::PLAIN_GENBANK == fid) {
731             return hasSequenceOrAnns(data);
732         } else {
733             return hasSequence(data);
734         }
735     } else if (dstDbiRef.isValid()) {
736         return hasSequenceOrAnns(data);
737     } else {
738         return false;
739     }
740 }
741 
getSeqObject(const QVariantMap & data,WorkflowContext * context)742 GObject *SeqWriter::getSeqObject(const QVariantMap &data, WorkflowContext *context) {
743     SharedDbiDataHandler objId = data[BaseSlots::DNA_SEQUENCE_SLOT().getId()].value<SharedDbiDataHandler>();
744     return StorageUtils::getSequenceObject(context->getDataStorage(), objId);
745 }
746 
getAnnObject(const QVariantMap & data,WorkflowContext * context)747 GObject *SeqWriter::getAnnObject(const QVariantMap &data, WorkflowContext *context) {
748     const QList<SharedAnnotationData> anns = StorageUtils::getAnnotationTable(context->getDataStorage(), data[BaseSlots::ANNOTATION_TABLE_SLOT().getId()]);
749     CHECK(!anns.isEmpty(), nullptr);
750     QScopedPointer<U2SequenceObject> seqObj(qobject_cast<U2SequenceObject *>(getSeqObject(data, context)));
751     QString seqName = "Unknown";
752     if (!seqObj.isNull()) {
753         seqName = seqObj->getSequenceName();
754     }
755     AnnotationTableObject *annObj = new AnnotationTableObject(seqName + " features", context->getDataStorage()->getDbiRef());
756     annObj->addAnnotations(anns);
757     return annObj;
758 }
759 
getObjectsToWrite(const QVariantMap & data) const760 QSet<GObject *> SeqWriter::getObjectsToWrite(const QVariantMap &data) const {
761     return QSet<GObject *>() << getSeqObject(data, context) << getAnnObject(data, context);
762 }
763 
storeEntry(IOAdapter * io,const QVariantMap & data,int entryNum)764 void SeqWriter::storeEntry(IOAdapter *io, const QVariantMap &data, int entryNum) {
765     if (format == nullptr) {
766         return;
767     }
768     DocumentFormatId fid = format->getFormatId();
769     if (fid == BaseDocumentFormats::FASTA) {
770         FastaWriter::streamingStoreEntry(format, io, data, context, entryNum);
771         currentSplitSequence++;
772     } else if (fid == BaseDocumentFormats::PLAIN_GENBANK) {
773         GenbankWriter::streamingStoreEntry(format, io, data, context, entryNum);
774     } else if (fid == BaseDocumentFormats::FASTQ) {
775         FastQWriter::streamingStoreEntry(format, io, data, context, entryNum);
776     } else if (fid == BaseDocumentFormats::RAW_DNA_SEQUENCE) {
777         RawSeqWriter::streamingStoreEntry(format, io, data, context, entryNum);
778     } else {
779         assert(0);
780         ioLog.error(QString("Unknown data format for writing: %1").arg(fid));
781     }
782 }
783 
takeParameters(U2OpStatus & os)784 void SeqWriter::takeParameters(U2OpStatus &os) {
785     BaseDocWriter::takeParameters(os);
786     SAFE_POINT_OP(os, );
787 
788     Attribute *splitAttr = actor->getParameter(BaseAttributes::SPLIT_SEQ_ATTRIBUTE().getId());
789     if (nullptr != format && format->getFormatId() == BaseDocumentFormats::FASTA && splitAttr != nullptr) {
790         numSplitSequences = splitAttr->getAttributeValue<int>(context);
791     } else {
792         numSplitSequences = 1;
793     }
794 }
795 
takeUrlList(const QVariantMap & data,int metadataId,U2OpStatus & os)796 QStringList SeqWriter::takeUrlList(const QVariantMap &data, int metadataId, U2OpStatus &os) {
797     QStringList urls = BaseDocWriter::takeUrlList(data, metadataId, os);
798     CHECK_OP(os, urls);
799     SAFE_POINT(1 == urls.size(), "Several urls in the output attribute", urls);
800 
801     SharedDbiDataHandler seqId = data.value(BaseSlots::DNA_SEQUENCE_SLOT().getId()).value<SharedDbiDataHandler>();
802     QSharedPointer<U2SequenceObject> seqObj(StorageUtils::getSequenceObject(context->getDataStorage(), seqId));
803 
804     currentSplitSequence = 0;
805 
806     if (!seqObj.data()) {
807         numSplitSequences = 1;
808     } else {
809         qint64 seqLen = seqObj.data()->getSequenceLength();
810         if (seqLen < numSplitSequences) {
811             numSplitSequences = seqLen;
812         }
813         if (0 == numSplitSequences) {
814             numSplitSequences = 1;
815         }
816     }
817 
818     if (numSplitSequences > 1) {
819         QString url = urls.takeFirst();
820         for (int i = 0; i < numSplitSequences; i++) {
821             urls << GUrlUtils::insertSuffix(url, "_split" + QString::number(i + 1));
822         }
823     }
824 
825     return urls;
826 }
827 
isStreamingSupport() const828 bool SeqWriter::isStreamingSupport() const {
829     if (numSplitSequences > 1) {
830         return false;
831     }
832     return BaseDocWriter::isStreamingSupport();
833 }
834 
835 /*************************************
836  * MSAWriter
837  *************************************/
data2doc(Document * doc,const QVariantMap & data)838 void MSAWriter::data2doc(Document *doc, const QVariantMap &data) {
839     data2document(doc, data, context);
840 }
841 
data2document(Document * doc,const QVariantMap & data,WorkflowContext * context)842 void MSAWriter::data2document(Document *doc, const QVariantMap &data, WorkflowContext *context) {
843     SharedDbiDataHandler msaId = data.value(BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId()).value<SharedDbiDataHandler>();
844     QScopedPointer<MultipleSequenceAlignmentObject> msaObj(StorageUtils::getMsaObject(context->getDataStorage(), msaId));
845     SAFE_POINT(!msaObj.isNull(), "NULL MSA Object!", );
846     MultipleSequenceAlignment msa = msaObj->getMsaCopy();
847 
848     SAFE_POINT(!msa->isEmpty(), tr("Empty alignment passed for writing to %1").arg(doc->getURLString()), )
849 
850     if (msa->getName().isEmpty()) {
851         QString name = QString(MA_OBJECT_NAME + "_%1").arg(ct);
852         msa->setName(name);
853         ct++;
854     }
855 
856     U2OpStatus2Log os;
857     MultipleSequenceAlignmentObject *obj = MultipleSequenceAlignmentImporter::createAlignment(doc->getDbiRef(), msa, os);
858     CHECK_OP(os, );
859 
860     doc->addObject(obj);
861 }
862 
hasDataToWrite(const QVariantMap & data) const863 bool MSAWriter::hasDataToWrite(const QVariantMap &data) const {
864     return data.contains(BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId());
865 }
866 
getObjectsToWrite(const QVariantMap & data) const867 QSet<GObject *> MSAWriter::getObjectsToWrite(const QVariantMap &data) const {
868     SharedDbiDataHandler objId = data[BaseSlots::MULTIPLE_ALIGNMENT_SLOT().getId()].value<SharedDbiDataHandler>();
869     return QSet<GObject *>() << StorageUtils::getMsaObject(context->getDataStorage(), objId);
870 }
871 
isStreamingSupport() const872 bool MSAWriter::isStreamingSupport() const {
873     return false;
874 }
875 
876 /*************************************
877  * DataWorkerFactory
878  *************************************/
createWorker(Actor * a)879 Worker *DataWorkerFactory::createWorker(Actor *a) {
880     // TODO: wtf is this??
881     //  each actor must have own factory
882 
883     BaseWorker *w = nullptr;
884     QString protoId = a->getProto()->getId();
885     if (CoreLibConstants::READ_TEXT_PROTO_ID == protoId) {
886         TextReader *t = new TextReader(a);
887         w = t;
888     } else if (CoreLibConstants::WRITE_TEXT_PROTO_ID == protoId) {
889         w = new TextWriter(a);
890     } else if (CoreLibConstants::WRITE_FASTA_PROTO_ID == protoId) {
891         w = new FastaWriter(a);
892     } else if (CoreLibConstants::WRITE_GENBANK_PROTO_ID == protoId) {
893         w = new GenbankWriter(a);
894     } else if (CoreLibConstants::WRITE_CLUSTAL_PROTO_ID == protoId) {
895         w = new MSAWriter(a, BaseDocumentFormats::CLUSTAL_ALN);
896     } else if (CoreLibConstants::WRITE_STOCKHOLM_PROTO_ID == protoId) {
897         w = new MSAWriter(a, BaseDocumentFormats::STOCKHOLM);
898     } else if (CoreLibConstants::GENERIC_READ_MA_PROTO_ID == protoId) {
899         w = new GenericMSAReader(a);
900     } else if (CoreLibConstants::GENERIC_READ_SEQ_PROTO_ID == protoId) {
901         w = new GenericSeqReader(a);
902     } else if (CoreLibConstants::WRITE_MSA_PROTO_ID == protoId) {
903         w = new MSAWriter(a);
904     } else if (CoreLibConstants::WRITE_SEQ_PROTO_ID == protoId) {
905         w = new SeqWriter(a);
906     } else if (CoreLibConstants::WRITE_FASTQ_PROTO_ID == protoId) {
907         w = new FastQWriter(a);
908     } else {
909         assert(0);
910     }
911     return w;
912 }
913 
init()914 void DataWorkerFactory::init() {
915     DomainFactory *localDomain = WorkflowEnv::getDomainRegistry()->getById(LocalDomainFactory::ID);
916     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_FASTA_PROTO_ID));
917     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_GENBANK_PROTO_ID));
918     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::READ_TEXT_PROTO_ID));
919     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_TEXT_PROTO_ID));
920     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::GENERIC_READ_SEQ_PROTO_ID));
921     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::GENERIC_READ_MA_PROTO_ID));
922     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_CLUSTAL_PROTO_ID));
923     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_STOCKHOLM_PROTO_ID));
924     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_MSA_PROTO_ID));
925     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_SEQ_PROTO_ID));
926     localDomain->registerEntry(new DataWorkerFactory(CoreLibConstants::WRITE_FASTQ_PROTO_ID));
927 }
928 
929 }  // namespace LocalWorkflow
930 }  // namespace U2
931