1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include <limits>
23 #include <time.h>
24 
25 #include <QFile>
26 #include <QScopedPointer>
27 
28 #include <U2Core/AppContext.h>
29 #include <U2Core/Counter.h>
30 #include <U2Core/IOAdapterUtils.h>
31 #include <U2Core/L10n.h>
32 #include <U2Core/U2AssemblyUtils.h>
33 #include <U2Core/U2CoreAttributes.h>
34 #include <U2Core/U2Dbi.h>
35 #include <U2Core/U2DbiRegistry.h>
36 #include <U2Core/U2OpStatusUtils.h>
37 #include <U2Core/U2SafePoints.h>
38 
39 #include "BAMDbiPlugin.h"
40 #include "CancelledException.h"
41 #include "ConvertToSQLiteTask.h"
42 #include "Dbi.h"
43 #include "IOException.h"
44 #include "Index.h"
45 #include "LoadBamInfoTask.h"
46 #include "Reader.h"
47 #include "SamReader.h"
48 
49 namespace U2 {
50 namespace BAM {
51 
ConvertToSQLiteTask(const GUrl & _sourceUrl,const U2DbiRef & dstDbiRef,BAMInfo & _bamInfo,bool _sam)52 ConvertToSQLiteTask::ConvertToSQLiteTask(const GUrl &_sourceUrl, const U2DbiRef &dstDbiRef, BAMInfo &_bamInfo, bool _sam)
53     : Task(tr("Convert BAM to UGENE database (%1)").arg(_sourceUrl.fileName()), TaskFlag_None),
54       sourceUrl(_sourceUrl),
55       dstDbiRef(dstDbiRef),
56       bamInfo(_bamInfo),
57       sam(_sam) {
58     GCOUNTER(cvar, "ConvertBamToUgenedb");
59     tpm = Progress_Manual;
60 }
61 
enableCoverageOnImport(U2AssemblyCoverageImportInfo & cii,int referenceLength)62 static void enableCoverageOnImport(U2AssemblyCoverageImportInfo &cii, int referenceLength) {
63     cii.computeCoverage = true;
64     int coverageInfoSize = qMin(U2AssemblyUtils::MAX_COVERAGE_VECTOR_SIZE, referenceLength);
65     cii.coverageBasesPerPoint = qMax(1.0, ((double)referenceLength) / coverageInfoSize);
66     cii.coverage.resize(coverageInfoSize);
67 }
68 
69 namespace {
70 
71 class BamIterator : public Iterator {
72 public:
BamIterator(BamReader & reader)73     BamIterator(BamReader &reader)
74         : reader(reader),
75           alignmentReader(nullptr, 0, 0),
76           alignmentReaderValid(false),
77           readValid(false) {
78     }
79 
hasNext()80     virtual bool hasNext() {
81         return readValid || alignmentReaderValid || !reader.isEof();
82     }
83 
next()84     virtual U2AssemblyRead next() {
85         if (!hasNext()) {
86             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
87         }
88         if (!readValid) {
89             if (!alignmentReaderValid) {
90                 alignmentReader = reader.getAlignmentReader();
91             }
92             alignmentReaderValid = false;
93             read = AssemblyDbi::alignmentToRead(alignmentReader.read());
94         }
95         readValid = false;
96         return read;
97     }
98 
skip()99     virtual void skip() {
100         if (!hasNext()) {
101             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
102         }
103         if (!readValid) {
104             if (!alignmentReaderValid) {
105                 alignmentReader = reader.getAlignmentReader();
106             }
107             alignmentReaderValid = false;
108             alignmentReader.skip();
109         }
110         readValid = false;
111     }
112 
peek()113     virtual const U2AssemblyRead &peek() {
114         if (!hasNext()) {
115             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
116         }
117         if (!readValid) {
118             if (!alignmentReaderValid) {
119                 alignmentReader = reader.getAlignmentReader();
120             }
121             alignmentReaderValid = false;
122             read = AssemblyDbi::alignmentToRead(alignmentReader.read());
123             readValid = true;
124         }
125         return read;
126     }
127 
peekReferenceId()128     virtual int peekReferenceId() {
129         if (!hasNext()) {
130             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
131         }
132         if (!readValid) {
133             if (!alignmentReaderValid) {
134                 alignmentReader = reader.getAlignmentReader();
135                 alignmentReaderValid = true;
136             }
137         }
138         return alignmentReader.getId();
139     }
140 
141 private:
142     BamReader &reader;
143     BamReader::AlignmentReader alignmentReader;
144     bool alignmentReaderValid;
145     U2AssemblyRead read;
146     bool readValid;
147 };
148 
149 class SamIterator : public Iterator {
150 public:
SamIterator(SamReader & reader)151     SamIterator(SamReader &reader)
152         : reader(reader),
153           readReferenceId(-1),
154           readValid(false) {
155     }
156 
hasNext()157     virtual bool hasNext() {
158         return readValid || !reader.isEof();
159     }
160 
next()161     virtual U2AssemblyRead next() {
162         if (!hasNext()) {
163             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
164         }
165         if (!readValid) {
166             bool eof = false;
167             read = AssemblyDbi::alignmentToRead(reader.readAlignment(eof));
168         }
169         readValid = false;
170         return read;
171     }
172 
skip()173     virtual void skip() {
174         next();
175     }
176 
peek()177     virtual const U2AssemblyRead &peek() {
178         if (!hasNext()) {
179             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
180         }
181         if (!readValid) {
182             bool eof = false;
183             Alignment alignemnt = reader.readAlignment(eof);
184             readReferenceId = alignemnt.getReferenceId();
185             read = AssemblyDbi::alignmentToRead(alignemnt);
186             readValid = true;
187         }
188         return read;
189     }
190 
peekReferenceId()191     virtual int peekReferenceId() {
192         if (!hasNext()) {
193             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
194         }
195         if (!readValid) {
196             bool eof = false;
197             Alignment alignemnt = reader.readAlignment(eof);
198             readReferenceId = alignemnt.getReferenceId();
199             read = AssemblyDbi::alignmentToRead(alignemnt);
200             readValid = true;
201         }
202         return readReferenceId;
203     }
204 
205 private:
206     SamReader &reader;
207     U2AssemblyRead read;
208     int readReferenceId;
209     bool readValid;
210 };
211 
212 class ReferenceIterator : public Iterator {
213 public:
ReferenceIterator(int referenceId,Iterator & iterator)214     ReferenceIterator(int referenceId, Iterator &iterator)
215         : referenceId(referenceId),
216           iterator(iterator) {
217     }
218 
hasNext()219     virtual bool hasNext() {
220         return iterator.hasNext() && (iterator.peekReferenceId() == referenceId);
221     }
222 
next()223     virtual U2AssemblyRead next() {
224         if (!hasNext()) {
225             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
226         }
227         return iterator.next();
228     }
229 
skip()230     virtual void skip() {
231         if (!hasNext()) {
232             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
233         }
234         iterator.skip();
235     }
236 
peek()237     virtual const U2AssemblyRead &peek() {
238         if (!hasNext()) {
239             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
240         }
241         return iterator.peek();
242     }
243 
peekReferenceId()244     virtual int peekReferenceId() {
245         if (!hasNext()) {
246             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
247         }
248         return iterator.peekReferenceId();
249     }
250 
251 private:
252     int referenceId;
253     Iterator &iterator;
254 };
255 
256 class SkipUnmappedIterator : public Iterator {
257 public:
SkipUnmappedIterator(Iterator & iterator)258     SkipUnmappedIterator(Iterator &iterator)
259         : iterator(iterator) {
260     }
261 
hasNext()262     virtual bool hasNext() {
263         skipUnmappedReads();
264         return iterator.hasNext();
265     }
266 
next()267     virtual U2AssemblyRead next() {
268         skipUnmappedReads();
269         if (!hasNext()) {
270             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
271         }
272         return iterator.next();
273     }
274 
skip()275     virtual void skip() {
276         skipUnmappedReads();
277         if (!hasNext()) {
278             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
279         }
280         iterator.skip();
281     }
282 
peek()283     virtual const U2AssemblyRead &peek() {
284         skipUnmappedReads();
285         if (!hasNext()) {
286             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
287         }
288         return iterator.peek();
289     }
290 
peekReferenceId()291     virtual int peekReferenceId() {
292         skipUnmappedReads();
293         if (!hasNext()) {
294             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
295         }
296         return iterator.peekReferenceId();
297     }
298 
299 private:
skipUnmappedReads()300     void skipUnmappedReads() {
301         while (iterator.hasNext()) {
302             if (-1 == iterator.peekReferenceId() ||
303                 ReadFlagsUtils::isUnmappedRead(iterator.peek()->flags) ||
304                 iterator.peek()->cigar.isEmpty()) {
305                 iterator.skip();
306             } else {
307                 break;
308             }
309         }
310     }
311 
312 private:
313     Iterator &iterator;
314 };
315 
316 class DbiIterator : public U2DbiIterator<U2AssemblyRead> {
317 public:
~DbiIterator()318     virtual ~DbiIterator() {
319     }
320 
321     virtual bool hasNext() = 0;
322 
323     virtual U2AssemblyRead next() = 0;
324 
325     virtual U2AssemblyRead peek() = 0;
326 
327     virtual qint64 getReadsImported() = 0;
328 };
329 
330 class SequentialDbiIterator : public DbiIterator {
331 public:
SequentialDbiIterator(int referenceId,bool skipUnmapped,Iterator & inputIterator,TaskStateInfo & stateInfo,const IOAdapter & ioAdapter)332     SequentialDbiIterator(int referenceId, bool skipUnmapped, Iterator &inputIterator, TaskStateInfo &stateInfo, const IOAdapter &ioAdapter)
333         : referenceIterator(referenceId, inputIterator),
334           skipUnmappedIterator(skipUnmapped ? new SkipUnmappedIterator(referenceIterator) : nullptr),
335           iterator(skipUnmapped ? dynamic_cast<Iterator *>(skipUnmappedIterator.data()) : dynamic_cast<Iterator *>(&referenceIterator)),
336           readsImported(0),
337           stateInfo(stateInfo),
338           ioAdapter(ioAdapter) {
339     }
340 
hasNext()341     virtual bool hasNext() {
342         if (stateInfo.isCanceled()) {
343             throw CancelledException(BAMDbiPlugin::tr("Task was cancelled"));
344         }
345         return iterator->hasNext();
346     }
347 
next()348     virtual U2AssemblyRead next() {
349         if (!hasNext()) {
350             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
351         }
352         stateInfo.progress = ioAdapter.getProgress();
353         readsImported++;
354         return iterator->next();
355     }
356 
peek()357     virtual U2AssemblyRead peek() {
358         if (!hasNext()) {
359             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
360         }
361         return iterator->peek();
362     }
363 
getReadsImported()364     virtual qint64 getReadsImported() {
365         return readsImported;
366     }
367 
368 private:
369     ReferenceIterator referenceIterator;
370     QScopedPointer<SkipUnmappedIterator> skipUnmappedIterator;
371     Iterator *iterator;
372     qint64 readsImported;
373     TaskStateInfo &stateInfo;
374     const IOAdapter &ioAdapter;
375 };
376 
377 class IndexedBamDbiIterator : public DbiIterator {
378 public:
IndexedBamDbiIterator(int referenceId,bool skipUnmapped,BamReader & reader,const Index & index,TaskStateInfo & stateInfo,const IOAdapter & ioAdapter)379     IndexedBamDbiIterator(int referenceId, bool skipUnmapped, BamReader &reader, const Index &index, TaskStateInfo &stateInfo, const IOAdapter &ioAdapter)
380         : iterator(reader),
381           dbiIterator(referenceId, skipUnmapped, iterator, stateInfo, ioAdapter),
382           hasReads(false) {
383         {
384             VirtualOffset minOffset = VirtualOffset(0xffffffffffffLL, 0xffff);
385             foreach (const Index::ReferenceIndex::Bin &bin, index.getReferenceIndices()[referenceId].getBins()) {
386                 foreach (const Index::ReferenceIndex::Chunk &chunk, bin.getChunks()) {
387                     if (minOffset > chunk.getStart()) {
388                         minOffset = chunk.getStart();
389                         hasReads = true;
390                     }
391                 }
392             }
393             if (hasReads) {
394                 reader.seek(minOffset);
395             }
396         }
397     }
398 
hasNext()399     virtual bool hasNext() {
400         return hasReads && dbiIterator.hasNext();
401     }
402 
next()403     virtual U2AssemblyRead next() {
404         if (!hasNext()) {
405             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
406         }
407         return dbiIterator.next();
408     }
409 
peek()410     virtual U2AssemblyRead peek() {
411         if (!hasNext()) {
412             throw Exception(BAMDbiPlugin::tr("The iteration has no next element"));
413         }
414         return dbiIterator.peek();
415     }
416 
getReadsImported()417     virtual qint64 getReadsImported() {
418         return dbiIterator.getReadsImported();
419     }
420 
421 private:
422     BamIterator iterator;
423     SequentialDbiIterator dbiIterator;
424     bool hasReads;
425 };
426 
427 static const int READS_CHUNK_SIZE = 250 * 1000;
428 
429 }  // namespace
430 
run()431 void ConvertToSQLiteTask::run() {
432     try {
433         taskLog.info(tr("Converting assembly from %1 to %2 started")
434                          .arg(sourceUrl.fileName())
435                          .arg(getDestinationUrl().fileName()));
436 
437         time_t startTime = time(0);
438 
439         qint64 totalReadsImported = importReads();
440 
441         time_t packStart = time(0);
442         packReads();
443         time_t packTime = time(0) - packStart;
444 
445         updateAttributes();
446 
447         foreach (AssemblyImporter *importer, importers) {
448             importedAssemblies << importer->getAssembly();
449         }
450         qDeleteAll(importers);
451         importers.clear();
452 
453         time_t totalTime = time(0) - startTime;
454 
455         taskLog.info(QString("Converting assembly from %1 to %2 successfully finished: imported %3 reads, total time %4 s, pack time %5 s")
456                          .arg(sourceUrl.fileName())
457                          .arg(getDestinationUrl().fileName())
458                          .arg(totalReadsImported)
459                          .arg(totalTime)
460                          .arg(packTime));
461 
462     } catch (const CancelledException & /*e*/) {
463         qDeleteAll(importers);
464         importers.clear();
465         if (getDestinationUrl().isLocalFile()) {
466             QFile::remove(getDestinationUrl().getURLString());
467         }
468         taskLog.info(tr("Converting assembly from %1 to %2 cancelled")
469                          .arg(sourceUrl.fileName())
470                          .arg(getDestinationUrl().fileName()));
471     } catch (const Exception &e) {
472         qDeleteAll(importers);
473         importers.clear();
474         setError(tr("Converting assembly from %1 to %2 failed: %3")
475                      .arg(sourceUrl.fileName())
476                      .arg(getDestinationUrl().fileName())
477                      .arg(e.getMessage()));
478         if (getDestinationUrl().isLocalFile()) {
479             QFile::remove(getDestinationUrl().getURLString());
480         }
481     }
482 }
483 
getDestinationUrl() const484 GUrl ConvertToSQLiteTask::getDestinationUrl() const {
485     return GUrl(U2DbiUtils::ref2Url(dstDbiRef));
486 }
487 
getAssemblies() const488 QList<U2Assembly> ConvertToSQLiteTask::getAssemblies() const {
489     return importedAssemblies;
490 }
491 
isSorted(Reader * reader) const492 bool ConvertToSQLiteTask::isSorted(Reader *reader) const {
493     return Header::Coordinate == reader->getHeader().getSortingOrder() ||
494            Header::QueryName == reader->getHeader().getSortingOrder() ||
495            bamInfo.hasIndex();
496 }
497 
importReads()498 qint64 ConvertToSQLiteTask::importReads() {
499     qint64 totalReadsImported = 0;
500     QScopedPointer<IOAdapter> ioAdapter(prepareIoAdapter());
501 
502     BamReader *bamReader = nullptr;
503     SamReader *samReader = nullptr;
504     QScopedPointer<Reader> reader(nullptr);
505     if (sam) {
506         samReader = new SamReader(*ioAdapter);
507         reader.reset(samReader);
508     } else {
509         bamReader = new BamReader(*ioAdapter);
510         reader.reset(bamReader);
511     }
512 
513     references = reader->getHeader().getReferences();
514 
515     for (int referenceId = -1; referenceId < references.size(); referenceId++) {
516         importers[referenceId] = new AssemblyImporter(stateInfo);
517     }
518 
519     stateInfo.setDescription("Importing reads");
520 
521     if (isSorted(reader.data())) {
522         totalReadsImported += importSortedReads(samReader, bamReader, reader.data(), ioAdapter.data());
523     } else {
524         totalReadsImported += importUnsortedReads(samReader, bamReader, reader.data(), importInfos);
525     }
526 
527     foreach (int referenceId, importers.keys()) {
528         if (!importers[referenceId]->isObjectExist()) {
529             delete importers[referenceId];
530             importers.remove(referenceId);
531         }
532     }
533 
534     return totalReadsImported;
535 }
536 
packReads()537 void ConvertToSQLiteTask::packReads() {
538     stateInfo.setDescription("Packing reads");
539 
540     U2OpStatusImpl opStatus;
541     foreach (int referenceId, importers.keys()) {
542         SAFE_POINT_EXT(importers.contains(referenceId), throw Exception("An unexpected assembly"), );
543         taskLog.details(tr("Packing reads for assembly '%1' (%2 of %3)")
544                             .arg(importers[referenceId]->getAssembly().visualName)
545                             .arg(referenceId + 1)
546                             .arg(importInfos.size()));
547 
548         importers[referenceId]->packReads(importInfos[referenceId]);
549         CHECK_EXT(!opStatus.isCoR(), throw Exception(opStatus.getError()), );
550     }
551 }
552 
updateAttributes()553 void ConvertToSQLiteTask::updateAttributes() {
554     DbiConnection connection(dstDbiRef, stateInfo);
555     SAFE_POINT_EXT(!stateInfo.hasError(), throw Exception(getError()), );
556     U2AttributeDbi *attributeDbi = connection.dbi->getAttributeDbi();
557     CHECK(nullptr != attributeDbi, );
558 
559     foreach (int referenceId, importers.keys()) {
560         const U2Assembly &assembly = importers[referenceId]->getAssembly();
561 
562         if (-1 != referenceId) {
563             updateReferenceLengthAttribute(references[referenceId].getLength(), assembly, attributeDbi);
564             updateReferenceMd5Attribute(references[referenceId].getMd5(), assembly, attributeDbi);
565             updateReferenceSpeciesAttribute(references[referenceId].getSpecies(), assembly, attributeDbi);
566             updateReferenceUriAttribute(references[referenceId].getUri(), assembly, attributeDbi);
567         }
568 
569         const U2AssemblyReadsImportInfo &importInfo = importInfos[referenceId];
570 
571         updateImportInfoMaxProwAttribute(importInfo, assembly, attributeDbi);
572         updateImportInfoReadsCountAttribute(importInfo, assembly, attributeDbi);
573         updateImportInfoCoverageStatAttribute(importInfo, assembly, attributeDbi);
574     }
575 }
576 
importSortedReads(SamReader * samReader,BamReader * bamReader,Reader * reader,IOAdapter * ioAdapter)577 qint64 ConvertToSQLiteTask::importSortedReads(SamReader *samReader, BamReader *bamReader, Reader *reader, IOAdapter *ioAdapter) {
578     qint64 totalReadsImported = 0;
579 
580     QScopedPointer<Iterator> iterator;
581     if (!bamInfo.hasIndex()) {
582         if (sam) {
583             iterator.reset(new SamIterator(*samReader));
584         } else {
585             iterator.reset(new BamIterator(*bamReader));
586         }
587     }
588 
589     totalReadsImported += importMappedSortedReads(bamReader, reader, iterator.data(), ioAdapter);
590 
591     if (bamInfo.isUnmappedSelected()) {
592         totalReadsImported += importUnmappedSortedReads(bamReader, reader, iterator, ioAdapter);
593     }
594 
595     return totalReadsImported;
596 }
597 
importMappedSortedReads(BamReader * bamReader,Reader * reader,Iterator * iterator,IOAdapter * ioAdapter)598 qint64 ConvertToSQLiteTask::importMappedSortedReads(BamReader *bamReader, Reader *reader, Iterator *iterator, IOAdapter *ioAdapter) {
599     qint64 totalReadsImported = 0;
600 
601     const QList<Header::Reference> references = reader->getHeader().getReferences();
602 
603     for (int referenceId = 0; referenceId < references.size(); referenceId++) {
604         if (bamInfo.isReferenceSelected(referenceId)) {
605             U2Assembly assembly;
606             assembly.visualName = references[referenceId].getName();
607             taskLog.details(tr("Importing assembly '%1' (%2 of %3)")
608                                 .arg(assembly.visualName)
609                                 .arg(referenceId + 1)
610                                 .arg(references.size()));
611 
612             U2AssemblyReadsImportInfo &importInfo = importInfos[referenceId];
613             enableCoverageOnImport(importInfo.coverageInfo, references[referenceId].getLength());
614 
615             QScopedPointer<DbiIterator> dbiIterator;
616             if (bamInfo.hasIndex()) {
617                 dbiIterator.reset(new IndexedBamDbiIterator(referenceId, !bamInfo.isUnmappedSelected(), *bamReader, bamInfo.getIndex(), stateInfo, *ioAdapter));
618             } else {
619                 dbiIterator.reset(new SequentialDbiIterator(referenceId, !bamInfo.isUnmappedSelected(), *iterator, stateInfo, *ioAdapter));
620             }
621 
622             SAFE_POINT_EXT(importers.contains(referenceId), throw Exception("An unexpected assembly"), totalReadsImported);
623             importers[referenceId]->createAssembly(dstDbiRef, U2ObjectDbi::ROOT_FOLDER, dbiIterator.data(), importInfo, assembly);
624             CHECK_EXT(!hasError(), throw Exception(getError()), totalReadsImported);
625             CHECK_EXT(!isCanceled(), throw CancelledException(BAMDbiPlugin::tr("Task was cancelled")), totalReadsImported);
626 
627             totalReadsImported += dbiIterator->getReadsImported();
628             taskLog.details(tr("Successfully imported %1 reads for assembly '%2' (total %3 reads imported)")
629                                 .arg(dbiIterator->getReadsImported())
630                                 .arg(assembly.visualName)
631                                 .arg(totalReadsImported));
632         } else {
633             if (!bamInfo.hasIndex()) {
634                 while (iterator->hasNext() && iterator->peekReferenceId() == referenceId) {
635                     iterator->skip();
636                 }
637                 if (isCanceled()) {
638                     throw CancelledException(BAMDbiPlugin::tr("Task was cancelled"));
639                 }
640             }
641         }
642     }
643 
644     return totalReadsImported;
645 }
646 
importUnmappedSortedReads(BamReader * bamReader,Reader * reader,QScopedPointer<Iterator> & iterator,IOAdapter * ioAdapter)647 qint64 ConvertToSQLiteTask::importUnmappedSortedReads(BamReader *bamReader, Reader *reader, QScopedPointer<Iterator> &iterator, IOAdapter *ioAdapter) {
648     taskLog.details(tr("Importing unmapped reads"));
649 
650     if (bamInfo.hasIndex() && !reader->getHeader().getReferences().isEmpty()) {
651         const Index &index = bamInfo.getIndex();
652         bool maxOffsetFound = false;
653         VirtualOffset maxOffset = VirtualOffset(0, 0);
654 
655         for (int refId = 0; refId < reader->getHeader().getReferences().size(); ++refId) {
656             foreach (const Index::ReferenceIndex::Bin &bin, index.getReferenceIndices()[refId].getBins()) {
657                 foreach (const Index::ReferenceIndex::Chunk &chunk, bin.getChunks()) {
658                     if (chunk.getStart() < chunk.getEnd() && maxOffset < chunk.getStart()) {
659                         maxOffset = chunk.getStart();
660                         maxOffsetFound = true;
661                     }
662                 }
663             }
664         }
665 
666         if (maxOffsetFound) {
667             bamReader->seek(maxOffset);
668             iterator.reset(new BamIterator(*bamReader));
669             while (iterator->hasNext() && iterator->peekReferenceId() != -1) {
670                 iterator->skip();
671             }
672         } else {
673             iterator.reset(new BamIterator(*bamReader));
674         }
675     }
676 
677     SequentialDbiIterator dbiIterator(-1, false, *iterator, stateInfo, *ioAdapter);
678 
679     U2Assembly assembly;
680     assembly.visualName = "Unmapped";
681 
682     SAFE_POINT_EXT(importers.contains(-1), throw Exception("An unexpected assembly"), 0);
683     importers[-1]->createAssembly(dstDbiRef, U2ObjectDbi::ROOT_FOLDER, &dbiIterator, importInfos[-1], assembly);
684     CHECK_EXT(!hasError(), throw Exception(getError()), dbiIterator.getReadsImported());
685     CHECK_EXT(!isCanceled(), throw CancelledException(BAMDbiPlugin::tr("Task was cancelled")), dbiIterator.getReadsImported());
686 
687     return dbiIterator.getReadsImported();
688 }
689 
importUnsortedReads(SamReader * samReader,BamReader * bamReader,Reader * reader,QMap<int,U2AssemblyReadsImportInfo> & importInfos)690 qint64 ConvertToSQLiteTask::importUnsortedReads(SamReader *samReader, BamReader *bamReader, Reader *reader, QMap<int, U2AssemblyReadsImportInfo> &importInfos) {
691     taskLog.details(tr("No bam index given, preparing sequential import"));
692 
693     for (int referenceId = 0; referenceId < reader->getHeader().getReferences().size(); referenceId++) {
694         if (bamInfo.isReferenceSelected(referenceId)) {
695             createAssemblyObjectForUnsortedReads(referenceId, reader, importInfos);
696         }
697     }
698 
699     if (bamInfo.isUnmappedSelected()) {
700         createAssemblyObjectForUnsortedReads(-1, reader, importInfos);
701     }
702 
703     taskLog.details(tr("Importing reads sequentially"));
704 
705     QScopedPointer<Iterator> inputIterator;
706     if (sam) {
707         inputIterator.reset(new SamIterator(*samReader));
708     } else {
709         inputIterator.reset(new BamIterator(*bamReader));
710     }
711 
712     QScopedPointer<SkipUnmappedIterator> skipUnmappedIterator;
713     Iterator *iterator = nullptr;
714     if (!bamInfo.isUnmappedSelected()) {
715         skipUnmappedIterator.reset(new SkipUnmappedIterator(*inputIterator));
716         iterator = skipUnmappedIterator.data();
717     } else {
718         iterator = inputIterator.data();
719     }
720 
721     return importReadsSequentially(iterator);
722 }
723 
createAssemblyObjectForUnsortedReads(int referenceId,Reader * reader,QMap<int,U2::U2AssemblyReadsImportInfo> & importInfos)724 void ConvertToSQLiteTask::createAssemblyObjectForUnsortedReads(int referenceId, Reader *reader, QMap<int, U2::U2AssemblyReadsImportInfo> &importInfos) {
725     U2Assembly assembly;
726     assembly.visualName = (referenceId == -1 ? "Unmapped" : reader->getHeader().getReferences()[referenceId].getName());
727 
728     SAFE_POINT_EXT(importers.contains(referenceId), throw Exception("An unexpected assembly"), );
729     importers[referenceId]->createAssembly(dstDbiRef, U2ObjectDbi::ROOT_FOLDER, assembly);
730 
731     CHECK_EXT(!hasError(), throw Exception(getError()), );
732     CHECK_EXT(!isCanceled(), throw CancelledException(BAMDbiPlugin::tr("Task was cancelled")), );
733 
734     importInfos[referenceId].packed = false;
735 }
736 
importReadsSequentially(Iterator * iterator)737 qint64 ConvertToSQLiteTask::importReadsSequentially(Iterator *iterator) {
738     qint64 totalReadsImported = 0;
739 
740     U2OpStatusImpl opStatus;
741 
742     while (iterator->hasNext()) {
743         QMap<int, QList<U2AssemblyRead>> reads;
744 
745         int readCount = 0;
746         while (iterator->hasNext() && readCount < READS_CHUNK_SIZE) {
747             const int referenceId = iterator->peekReferenceId();
748             if ((-1 == referenceId && bamInfo.isUnmappedSelected()) ||
749                 bamInfo.isReferenceSelected(referenceId)) {
750                 U2AssemblyReadsImportInfo &importInfo = importInfos[referenceId];
751                 reads[referenceId] << iterator->next();
752                 readCount++;
753                 importInfo.nReads++;
754             } else {
755                 iterator->skip();
756             }
757         }
758 
759         CHECK_EXT(!isCanceled(), throw CancelledException(BAMDbiPlugin::tr("Task was cancelled")), totalReadsImported);
760 
761         flushReads(reads);
762         CHECK_EXT(!opStatus.isCoR(), throw Exception(opStatus.getError()), totalReadsImported);
763         totalReadsImported += readCount;
764     }
765 
766     return totalReadsImported;
767 }
768 
flushReads(const QMap<int,QList<U2AssemblyRead>> & reads)769 void ConvertToSQLiteTask::flushReads(const QMap<int, QList<U2AssemblyRead>> &reads) {
770     foreach (int index, reads.keys()) {
771         if (!reads[index].isEmpty()) {
772             BufferedDbiIterator<U2AssemblyRead> readsIterator(reads[index]);
773             SAFE_POINT_EXT(importers.contains(index), throw Exception("An unexpected assembly"), );
774             importers[index]->addReads(&readsIterator);
775         }
776     }
777 }
778 
updateReferenceLengthAttribute(int length,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)779 void ConvertToSQLiteTask::updateReferenceLengthAttribute(int length, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
780     U2IntegerAttribute lenAttr;
781     lenAttr.objectId = assembly.id;
782     lenAttr.name = U2BaseAttributeName::reference_length;
783     lenAttr.version = assembly.version;
784     lenAttr.value = length;
785 
786     U2OpStatusImpl status;
787     attributeDbi->createIntegerAttribute(lenAttr, status);
788     if (status.hasError()) {
789         throw Exception(status.getError());
790     }
791 }
792 
updateReferenceMd5Attribute(const QByteArray & md5,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)793 void ConvertToSQLiteTask::updateReferenceMd5Attribute(const QByteArray &md5, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
794     CHECK(!md5.isEmpty(), );
795 
796     U2ByteArrayAttribute md5Attr;
797     md5Attr.objectId = assembly.id;
798     md5Attr.name = U2BaseAttributeName::reference_md5;
799     md5Attr.version = assembly.version;
800     md5Attr.value = md5;
801 
802     U2OpStatusImpl status;
803     attributeDbi->createByteArrayAttribute(md5Attr, status);
804     if (status.hasError()) {
805         throw Exception(status.getError());
806     }
807 }
808 
updateReferenceSpeciesAttribute(const QByteArray & species,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)809 void ConvertToSQLiteTask::updateReferenceSpeciesAttribute(const QByteArray &species, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
810     CHECK(!species.isEmpty(), );
811 
812     U2ByteArrayAttribute speciesAttr;
813     speciesAttr.objectId = assembly.id;
814     speciesAttr.name = U2BaseAttributeName::reference_species;
815     speciesAttr.version = assembly.version;
816     speciesAttr.value = species;
817 
818     U2OpStatusImpl status;
819     attributeDbi->createByteArrayAttribute(speciesAttr, status);
820     if (status.hasError()) {
821         throw Exception(status.getError());
822     }
823 }
824 
updateReferenceUriAttribute(const QString & uri,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)825 void ConvertToSQLiteTask::updateReferenceUriAttribute(const QString &uri, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
826     CHECK(!uri.isEmpty(), );
827 
828     U2StringAttribute uriAttr;
829     uriAttr.objectId = assembly.id;
830     uriAttr.name = U2BaseAttributeName::reference_uri;
831     uriAttr.version = assembly.version;
832     uriAttr.value = uri;
833 
834     U2OpStatusImpl status;
835     attributeDbi->createStringAttribute(uriAttr, status);
836     if (status.hasError()) {
837         throw Exception(status.getError());
838     }
839 }
840 
updateImportInfoMaxProwAttribute(const U2AssemblyReadsImportInfo & importInfo,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)841 void ConvertToSQLiteTask::updateImportInfoMaxProwAttribute(const U2AssemblyReadsImportInfo &importInfo, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
842     const qint64 maxProw = importInfo.packStat.maxProw;
843 
844     if (maxProw > 0) {
845         U2IntegerAttribute maxProwAttr;
846         maxProwAttr.objectId = assembly.id;
847         maxProwAttr.name = U2BaseAttributeName::max_prow;
848         maxProwAttr.version = assembly.version;
849         maxProwAttr.value = maxProw;
850 
851         U2OpStatusImpl opStatus;
852         attributeDbi->createIntegerAttribute(maxProwAttr, opStatus);
853         if (opStatus.hasError()) {
854             throw Exception(opStatus.getError());
855         }
856     } else if (importInfo.packStat.readsCount > 0) {
857         // if there are reads, but maxProw == 0 => error
858         taskLog.details(tr("Warning: incorrect maxProw == %1, probably packing was not done! Attribute was not set").arg(maxProw));
859     }
860 }
861 
updateImportInfoReadsCountAttribute(const U2AssemblyReadsImportInfo & importInfo,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)862 void ConvertToSQLiteTask::updateImportInfoReadsCountAttribute(const U2AssemblyReadsImportInfo &importInfo, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
863     const qint64 readsCount = importInfo.packStat.readsCount;
864     CHECK(readsCount > 0, );
865 
866     U2IntegerAttribute countReadsAttr;
867     countReadsAttr.objectId = assembly.id;
868     countReadsAttr.name = "count_reads_attribute";
869     countReadsAttr.version = assembly.version;
870     countReadsAttr.value = readsCount;
871 
872     U2OpStatusImpl opStatus;
873     attributeDbi->createIntegerAttribute(countReadsAttr, opStatus);
874     if (opStatus.hasError()) {
875         throw Exception(opStatus.getError());
876     }
877 }
878 
updateImportInfoCoverageStatAttribute(const U2AssemblyReadsImportInfo & importInfo,const U2Assembly & assembly,U2AttributeDbi * attributeDbi)879 void ConvertToSQLiteTask::updateImportInfoCoverageStatAttribute(const U2AssemblyReadsImportInfo &importInfo, const U2Assembly &assembly, U2AttributeDbi *attributeDbi) {
880     const U2AssemblyCoverageStat &coverageStat = importInfo.coverageInfo.coverage;
881     CHECK(!coverageStat.isEmpty(), );
882 
883     U2ByteArrayAttribute attribute;
884     attribute.objectId = assembly.id;
885     attribute.name = U2BaseAttributeName::coverage_statistics;
886     attribute.value = U2AssemblyUtils::serializeCoverageStat(coverageStat);
887     attribute.version = assembly.version;
888 
889     U2OpStatusImpl opStatus;
890     attributeDbi->createByteArrayAttribute(attribute, opStatus);
891     if (opStatus.hasError()) {
892         throw Exception(opStatus.getError());
893     }
894 }
895 
prepareIoAdapter()896 IOAdapter *ConvertToSQLiteTask::prepareIoAdapter() {
897     IOAdapterFactory *factory = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(IOAdapterUtils::url2io(sourceUrl));
898     SAFE_POINT_EXT(nullptr != factory, throw IOException(L10N::nullPointerError("IO adapter factory")), nullptr);
899     QScopedPointer<IOAdapter> ioAdapter(factory->createIOAdapter());
900 
901     if (!ioAdapter->open(sourceUrl, IOAdapterMode_Read)) {
902         throw IOException(L10N::errorReadingFile(sourceUrl));
903     }
904 
905     return ioAdapter.take();
906 }
907 
908 }  // namespace BAM
909 }  // namespace U2
910