1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "MultiTableAssemblyAdapter.h"
23 
24 #include <U2Core/Timer.h>
25 #include <U2Core/U2AssemblyUtils.h>
26 #include <U2Core/U2OpStatusUtils.h>
27 #include <U2Core/U2SafePoints.h>
28 #include <U2Core/U2SqlHelpers.h>
29 
30 #include "../SQLiteAssemblyDbi.h"
31 #include "../SQLiteDbi.h"
32 #include "../SQLiteObjectDbi.h"
33 
34 namespace U2 {
35 
36 #define DEFAULT_ROWS_PER_TABLE 5000
37 
MultiTableAssemblyAdapter(SQLiteDbi * _dbi,const U2DataId & assemblyId,const AssemblyCompressor * compressor,DbRef * db,U2OpStatus & os)38 MultiTableAssemblyAdapter::MultiTableAssemblyAdapter(SQLiteDbi *_dbi, const U2DataId &assemblyId, const AssemblyCompressor *compressor, DbRef *db, U2OpStatus &os)
39     : SQLiteAssemblyAdapter(assemblyId, compressor, db) {
40     dbi = _dbi;
41     version = -1;
42     syncTables(os);
43     rowsPerRange = DEFAULT_ROWS_PER_TABLE;
44 }
45 
~MultiTableAssemblyAdapter()46 MultiTableAssemblyAdapter::~MultiTableAssemblyAdapter() {
47     clearTableAdaptersInfo();
48 }
49 
clearTableAdaptersInfo()50 void MultiTableAssemblyAdapter::clearTableAdaptersInfo() {
51     qDeleteAll(adapters);
52     adaptersGrid.clear();
53     idExtras.clear();
54     elenRanges.clear();
55 }
56 
syncTables(U2OpStatus & os)57 void MultiTableAssemblyAdapter::syncTables(U2OpStatus &os) {
58     qint64 versionInDb = dbi->getObjectDbi()->getObjectVersion(assemblyId, os);
59     if (versionInDb <= version) {
60         return;
61     }
62     SQLiteReadQuery q("SELECT idata FROM Assembly WHERE object = ?1", db, os);
63     q.bindDataId(1, assemblyId);
64     if (q.step()) {
65         QByteArray data = q.getBlob(0);
66         rereadTables(data, os);
67         if (!os.hasError()) {
68             version = versionInDb;
69         }
70     }
71 }
72 
toRange(const QVector<int> & startPos)73 static QVector<U2Region> toRange(const QVector<int> &startPos) {
74     QVector<U2Region> res;
75     int prev = 0;
76     foreach (int pos, startPos) {
77         res << U2Region(prev, pos - prev);
78         prev = pos;
79     }
80     return res;
81 }
82 
initTables(const QList<U2AssemblyRead> & reads,U2OpStatus & os)83 void MultiTableAssemblyAdapter::initTables(const QList<U2AssemblyRead> &reads, U2OpStatus &os) {
84     if (os.hasError()) {
85         return;
86     }
87     SAFE_POINT(elenRanges.isEmpty(), "Effective ranges are already initialized!", );
88 
89     int nReads = reads.size();
90     if (false && nReads > 1000) {
91         /*    // get reads distribution first
92         QVector<int> distribution(nReads / 10, 0);
93         int* data = distribution.data();
94         foreach(const U2AssemblyRead& read, reads) {
95             int elen = read->readSequence.size() + U2AssemblyUtils::getCigarExtraLength(read->cigar);
96             int idx = elen / 10;
97             data[idx]++;
98         }
99         // derive regions
100         // TODO:*/
101     } else {
102         QVector<int> starts;
103         starts << 50 << 100 << 200 << 400 << 800 << 4 * 1000 << 25 * 1000 << 100 * 1000 << 500 * 1000 << 2 * 1000 * 1000;
104         elenRanges << toRange(starts);
105     }
106 
107     initAdaptersGrid(1, elenRanges.size());
108 
109     flushTables(os);
110 }
111 
rereadTables(const QByteArray & idata,U2OpStatus & os)112 void MultiTableAssemblyAdapter::rereadTables(const QByteArray &idata, U2OpStatus &os) {
113     QWriteLocker wl(&tablesSyncLock);
114 
115     clearTableAdaptersInfo();
116 
117     // format: N, N, N, N | N, N |.....
118     // elements are separated by | sign. First element encodes ranges, second prow step and max prow, others are for future extension
119     if (idata.isEmpty()) {
120         // assembly is empty - no index data was created
121         return;
122     }
123     QList<QByteArray> elements = idata.split('|');
124     if (elements.size() < 2) {
125         os.setError(U2DbiL10n::tr("Failed to detect assembly storage format: %1").arg(idata.constData()));
126         return;
127     }
128     QByteArray elenData = elements.at(0);
129     QByteArray prowData = elements.at(1);
130 
131     QList<QByteArray> elenTokens = elenData.split(',');
132     U2Region prev(-1, 1);
133     bool parseOk = true;
134     foreach (const QByteArray &elenTok, elenTokens) {
135         int start = elenTok.toInt(&parseOk);
136         if (!parseOk || start < prev.endPos()) {
137             os.setError(QString("Failed to parse range: %1, full: %2").arg(elenTok.constData()).arg(elenData.constData()));
138             return;
139         }
140         U2Region region(prev.endPos(), start - prev.endPos());
141         elenRanges << region;
142         prev = region;
143     }
144     elenRanges << U2Region(prev.endPos(), INT_MAX);
145 
146     QList<QByteArray> prowTokens = prowData.split(',');
147     int prange = prowTokens.at(0).toInt(&parseOk);
148     if (prange < 1 || !parseOk) {
149         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
150         return;
151     }
152     if (prowTokens.size() != 2) {
153         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
154         return;
155     }
156     int nRows = prowTokens.at(1).toInt(&parseOk);
157     if (nRows < 0 || !parseOk) {
158         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
159         return;
160     }
161 
162     // ok, all regions parsed, now create adapters
163     int nElens = elenRanges.size();
164     initAdaptersGrid(nRows, nElens);
165     for (int rowPos = 0; rowPos < nRows; rowPos++) {
166         for (int elenPos = 0; elenPos < nElens; elenPos++) {
167             QString suffix = getTableSuffix(rowPos, elenPos);
168             QString tableName = SingleTableAssemblyAdapter::getReadsTableName(assemblyId, 'M', suffix);
169             if (SQLiteUtils::isTableExists(tableName, db, os)) {
170                 createAdapter(rowPos, elenPos, os);
171             }
172         }
173     }
174 }
175 
flushTables(U2OpStatus & os)176 void MultiTableAssemblyAdapter::flushTables(U2OpStatus &os) {
177     bool empty = adaptersGrid.isEmpty();
178     if (empty) {
179         // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
180         initTables(QList<U2AssemblyRead>(), os);
181         if (os.hasError()) {
182             return;
183         }
184     }
185     QByteArray idata;
186     for (int i = 0; i < elenRanges.size(); i++) {
187         int rangeStart = elenRanges[i].startPos;
188         if (!idata.isEmpty()) {
189             idata.append(',');
190         }
191         idata.append(QByteArray::number(rangeStart));
192     }
193     idata.append('|').append(QByteArray::number(rowsPerRange)).append(',').append(QByteArray::number(adaptersGrid.size()));
194 
195     SQLiteWriteQuery q("UPDATE Assembly SET idata = ?1 WHERE object = ?2", db, os);
196     q.bindBlob(1, idata);
197     q.bindDataId(2, assemblyId);
198     q.execute();
199 }
200 
getTableSuffix(int rowPos,int elenPos)201 QString MultiTableAssemblyAdapter::getTableSuffix(int rowPos, int elenPos) {
202     U2Region eRegion = elenRanges[elenPos];
203     bool last = (elenPos + 1 == elenRanges.size());
204     return QString("%1_%2_%3").arg(eRegion.startPos).arg(last ? QString("U") : QString::number(eRegion.endPos())).arg(rowPos);
205 }
206 
initAdaptersGrid(int nRows,int nElens)207 void MultiTableAssemblyAdapter::initAdaptersGrid(int nRows, int nElens) {
208     assert(adaptersGrid.isEmpty());
209     adaptersGrid.resize(nRows);
210     for (int i = 0; i < nRows; i++) {
211         adaptersGrid[i] = QVector<MTASingleTableAdapter *>(nElens, nullptr);
212     }
213 }
214 
createAdapter(int rowPos,int elenPos,U2OpStatus & os)215 MTASingleTableAdapter *MultiTableAssemblyAdapter::createAdapter(int rowPos, int elenPos, U2OpStatus &os) {
216     assert(adaptersGrid.at(rowPos).at(elenPos) == nullptr);
217 
218     QString suffix = getTableSuffix(rowPos, elenPos);
219     SingleTableAssemblyAdapter *sa = new SingleTableAssemblyAdapter(dbi, assemblyId, 'M', suffix, compressor, db, os);
220     const U2Region &elenRange = elenRanges.at(elenPos);
221     sa->enableRangeTableMode(elenRange.startPos, elenRange.endPos());
222     QByteArray idExtra = getIdExtra(rowPos, elenPos);
223     MTASingleTableAdapter *ma = new MTASingleTableAdapter(sa, rowPos, elenPos, idExtra);
224     ma->singleTableAdapter->createReadsTables(os);
225     adapters << ma;
226     idExtras << idExtra;
227     adaptersGrid[rowPos][elenPos] = ma;
228     return ma;
229 }
230 
createReadsIndexes(U2OpStatus & os)231 void MultiTableAssemblyAdapter::createReadsIndexes(U2OpStatus &os) {
232     SQLiteWriteQuery("PRAGMA temp_store = FILE", db, os).execute();
233     CHECK_OP(os, );
234     foreach (MTASingleTableAdapter *a, adapters) {
235         a->singleTableAdapter->createReadsIndexes(os);
236         if (os.hasError()) {
237             break;
238         }
239     }
240     SQLiteWriteQuery("PRAGMA temp_store = MEMORY", db, os).execute();
241 }
242 
getIdExtra(int rowPos,int elenPos)243 QByteArray MultiTableAssemblyAdapter::getIdExtra(int rowPos, int elenPos) {
244     QByteArray res(4, 0);
245     qint16 *data = (qint16 *)res.data();
246     data[0] = (qint16)rowPos;
247     data[1] = (qint16)elenPos;
248     return res;
249 }
250 
countReads(const U2Region & r,U2OpStatus & os)251 qint64 MultiTableAssemblyAdapter::countReads(const U2Region &r, U2OpStatus &os) {
252     bool all = r == U2_REGION_MAX;
253     qint64 sum = 0;
254     // use more sensitive algorithm for smaller regions with low amount of reads
255     // and not-very sensitive for huge regions with a lot of reads
256     int nReadsToUseNotPreciseAlgorithms = 1000 / (r.length + 1);
257     foreach (MTASingleTableAdapter *a, adapters) {
258         int n = a->singleTableAdapter->countReads(r, os);
259         if (n != 0 && !all && n < nReadsToUseNotPreciseAlgorithms) {
260             n = a->singleTableAdapter->countReadsPrecise(r, os);
261         }
262         if (os.hasError()) {
263             break;
264         }
265         sum += n;
266     }
267     return sum;
268 }
269 
getMaxPackedRow(const U2Region & r,U2OpStatus & os)270 qint64 MultiTableAssemblyAdapter::getMaxPackedRow(const U2Region &r, U2OpStatus &os) {
271     qint64 max = 0;
272     // process only hi row adapters
273     int nRows = adaptersGrid.size();
274     for (int rowPos = nRows; --rowPos >= 0 && max == 0;) {
275         QVector<MTASingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
276         for (int elenPos = 0, nElens = elenAdapters.size(); elenPos < nElens; elenPos++) {
277             MTASingleTableAdapter *a = elenAdapters.at(elenPos);
278             if (a == nullptr) {
279                 continue;
280             }
281             assert(a->rowPos == rowPos);
282             qint64 n = a->singleTableAdapter->getMaxPackedRow(r, os);
283             assert(U2Region(rowsPerRange * rowPos, rowsPerRange).contains(n));
284             max = qMax(max, n);
285         }
286     }
287     return max;
288 }
289 
getMaxEndPos(U2OpStatus & os)290 qint64 MultiTableAssemblyAdapter::getMaxEndPos(U2OpStatus &os) {
291     // TODO: optimize by using gstart + maxReadLen for first n-1 tables
292     qint64 max = 0;
293     foreach (MTASingleTableAdapter *a, adapters) {
294         qint64 n = a->singleTableAdapter->getMaxEndPos(os);
295         if (os.hasError()) {
296             break;
297         }
298         max = qMax(max, n);
299     }
300     return max;
301 }
302 
getReads(const U2Region & r,U2OpStatus & os,bool sortedHint)303 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReads(const U2Region &r, U2OpStatus &os, bool sortedHint) {
304     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
305     foreach (MTASingleTableAdapter *a, adapters) {
306         iterators << a->singleTableAdapter->getReads(r, os, sortedHint);
307         if (os.hasError()) {
308             break;
309         }
310     }
311     if (os.hasError()) {
312         qDeleteAll(iterators);
313         return nullptr;
314     }
315     return new MTAReadsIterator(iterators, idExtras, sortedHint);
316 }
317 
getReadsByRow(const U2Region & r,qint64 minRow,qint64 maxRow,U2OpStatus & os)318 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReadsByRow(const U2Region &r, qint64 minRow, qint64 maxRow, U2OpStatus &os) {
319     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
320     QVector<QByteArray> selectedIdExtras;
321     U2Region targetRowRange(minRow, maxRow - minRow);
322     foreach (MTASingleTableAdapter *a, adapters) {
323         const U2Region rowRegion(a->rowPos * rowsPerRange, rowsPerRange);
324         if (!rowRegion.intersects(targetRowRange)) {
325             continue;
326         }
327         iterators << a->singleTableAdapter->getReadsByRow(r, minRow, maxRow, os);
328         selectedIdExtras << a->idExtra;
329         if (os.hasError()) {
330             break;
331         }
332     }
333     if (os.hasError()) {
334         qDeleteAll(iterators);
335         return nullptr;
336     }
337     return new MTAReadsIterator(iterators, selectedIdExtras, false);
338 }
339 
getReadsByName(const QByteArray & name,U2OpStatus & os)340 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReadsByName(const QByteArray &name, U2OpStatus &os) {
341     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
342     foreach (MTASingleTableAdapter *a, adapters) {
343         iterators << a->singleTableAdapter->getReadsByName(name, os);
344         if (os.hasError()) {
345             break;
346         }
347     }
348     if (os.hasError()) {
349         qDeleteAll(iterators);
350         return nullptr;
351     }
352     return new MTAReadsIterator(iterators, idExtras, false);
353 }
354 
getElenRangePosById(const U2DataId & id) const355 int MultiTableAssemblyAdapter::getElenRangePosById(const U2DataId &id) const {
356     QByteArray extra = U2DbiUtils::toDbExtra(id);
357 
358     SAFE_POINT(extra.size() == 4, QString("Illegal assembly read ID extra part! HEX: %1").arg(extra.toHex().constData()), -1);
359 
360     const qint16 *data = (const qint16 *)extra.constData();
361     return int(data[1]);
362 }
363 
getElenRangePosByLength(qint64 readLength) const364 int MultiTableAssemblyAdapter::getElenRangePosByLength(qint64 readLength) const {
365     int nElenRanges = elenRanges.size();
366     for (int i = 0; i < nElenRanges; i++) {
367         const U2Region &r = elenRanges[i];
368         if (r.contains(readLength)) {
369             return i;
370         }
371     }
372     FAIL(QString("Read length does not fit any range: %1, number of ranges: %2").arg(readLength).arg(nElenRanges), nElenRanges - 1);
373 }
374 
getRowRangePosByRow(quint64 row) const375 int MultiTableAssemblyAdapter::getRowRangePosByRow(quint64 row) const {
376     return row / rowsPerRange;
377 }
378 
getRowRangePosById(const U2DataId & id) const379 int MultiTableAssemblyAdapter::getRowRangePosById(const U2DataId &id) const {
380     QByteArray extra = U2DbiUtils::toDbExtra(id);
381 
382     SAFE_POINT(extra.size() == 4, QString("Extra part size of assembly read ID is not correct! HEX(Extra): %1").arg(extra.toHex().constData()), -1);
383 
384     const qint16 *data = (const qint16 *)extra.constData();
385     return int(data[0]);
386 }
387 
addTable2Id(const U2DataId & id,const QByteArray & idExtra)388 static U2DataId addTable2Id(const U2DataId &id, const QByteArray &idExtra) {
389     assert(U2DbiUtils::toDbExtra(id).isEmpty());
390     U2DataId res = U2DbiUtils::toU2DataId(U2DbiUtils::toDbiId(id), U2Type::AssemblyRead, idExtra);
391     return res;
392 }
393 
ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> & grid,int rowPos,int nElens)394 static void ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> &grid, int rowPos, int nElens) {
395     int oldRows = grid.size();
396     if (oldRows > rowPos) {
397         return;
398     }
399     int newRows = rowPos + 1;
400     grid.resize(newRows);
401     for (int r = oldRows; r < newRows; r++) {
402         grid[r].resize(nElens);
403     }
404 }
405 
406 #define N_READS_TO_FLUSH_TOTAL 100000
407 #define N_READS_TO_FLUSH_PER_RANGE 10000
408 
addReads(U2DbiIterator<U2AssemblyRead> * it,U2AssemblyReadsImportInfo & ii,U2OpStatus & os)409 void MultiTableAssemblyAdapter::addReads(U2DbiIterator<U2AssemblyRead> *it, U2AssemblyReadsImportInfo &ii, U2OpStatus &os) {
410     bool empty = adaptersGrid.isEmpty();
411     if (empty) {
412         // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
413         initTables(QList<U2AssemblyRead>(), os);
414         if (os.hasError()) {
415             return;
416         }
417     }
418 
419     bool packIsOn = empty;
420     qint64 prevLeftmostPos = -1;
421     PackAlgorithmContext packContext;
422 
423     QVector<QVector<QList<U2AssemblyRead>>> readsGrid;  // reads sorted by range
424     bool lastIteration = false;
425     qint64 readsInGrid = 0;
426     while (!os.isCoR()) {
427         int nElens = elenRanges.size();
428         if (it->hasNext()) {
429             U2AssemblyRead read = it->next();
430             if (os.isCoR()) {
431                 break;
432             }
433             int readLen = read->readSequence.length();
434             read->effectiveLen = readLen + U2AssemblyUtils::getCigarExtraLength(read->cigar);
435             int elenPos = getElenRangePosByLength(read->effectiveLen);
436 
437             packIsOn = packIsOn && read->leftmostPos >= prevLeftmostPos;
438             read->packedViewRow = packIsOn ? AssemblyPackAlgorithm::packRead(U2Region(read->leftmostPos, read->effectiveLen), packContext, os) : 0;
439             int rowPos = getRowRangePosByRow(read->packedViewRow);
440             ensureGridSize(readsGrid, rowPos, nElens);
441             readsGrid[rowPos][elenPos] << read;
442             ++ii.nReads;
443             ++readsInGrid;
444         } else {
445             lastIteration = true;
446         }
447 
448         int nRows = readsGrid.size();
449         if (lastIteration || readsInGrid > N_READS_TO_FLUSH_TOTAL) {
450             for (int rowPos = 0; rowPos < nRows && !os.isCoR(); rowPos++) {
451                 for (int elenPos = 0; elenPos < nElens && !os.isCoR(); elenPos++) {
452                     QList<U2AssemblyRead> &rangeReads = readsGrid[rowPos][elenPos];
453                     int nRangeReads = rangeReads.size();
454                     if (nRangeReads == 0 || (!lastIteration && nRangeReads < N_READS_TO_FLUSH_PER_RANGE)) {
455                         continue;
456                     }
457                     MTASingleTableAdapter *adapter = getAdapterByRowAndElenRange(rowPos, elenPos, true, os);
458                     U2AssemblyReadsImportInfo rangeReadsImportInfo;
459                     // pass the same coverage info through all adapters to accumulate coverage
460                     rangeReadsImportInfo.coverageInfo = ii.coverageInfo;
461                     BufferedDbiIterator<U2AssemblyRead> rangeReadsIterator(rangeReads);
462                     adapter->singleTableAdapter->addReads(&rangeReadsIterator, rangeReadsImportInfo, os);
463                     ii.coverageInfo = rangeReadsImportInfo.coverageInfo;
464                     readsInGrid -= rangeReads.size();
465                     rangeReads.clear();
466                 }
467             }
468         }
469         if (lastIteration) {
470             break;
471         }
472     }
473 
474     if (packIsOn && !os.hasError()) {
475         ii.packStat.readsCount = ii.nReads;
476         ii.packStat.maxProw = packContext.maxProw;
477         ii.packed = true;
478         flushTables(os);
479     }
480 }
481 
getAdapterByRowAndElenRange(int rowPos,int elenPos,bool createIfNotExits,U2OpStatus & os)482 MTASingleTableAdapter *MultiTableAssemblyAdapter::getAdapterByRowAndElenRange(int rowPos, int elenPos, bool createIfNotExits, U2OpStatus &os) {
483     int nElens = elenRanges.size();
484     assert(elenPos < nElens);
485     if (rowPos >= adaptersGrid.size()) {
486         assert(createIfNotExits);
487         if (!createIfNotExits) {
488             return nullptr;
489         }
490         int oldRowSize = adaptersGrid.size();
491         int newRowSize = rowPos + 1;
492         adaptersGrid.resize(newRowSize);
493         for (int i = oldRowSize; i < newRowSize; i++) {
494             adaptersGrid[i].resize(nElens);
495         }
496     }
497     QVector<MTASingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
498     assert(elenAdapters.size() == nElens);
499     MTASingleTableAdapter *adapter = elenAdapters.at(elenPos);
500     if (adapter == nullptr && createIfNotExits) {
501         adapter = createAdapter(rowPos, elenPos, os);
502     }
503     return adapter;
504 }
505 
removeReads(const QList<U2DataId> & readIds,U2OpStatus & os)506 void MultiTableAssemblyAdapter::removeReads(const QList<U2DataId> &readIds, U2OpStatus &os) {
507     int nReads = readIds.size();
508 
509     QHash<MTASingleTableAdapter *, QList<U2DataId>> readsByAdapter;
510     for (int i = 0; i < nReads; i++) {
511         const U2DataId &readId = readIds[i];
512         int rowPos = getRowRangePosById(readId);
513         int elenPos = getElenRangePosById(readId);
514         MTASingleTableAdapter *a = getAdapterByRowAndElenRange(rowPos, elenPos, false, os);
515 
516         SAFE_POINT(a != nullptr, QString("No table adapter was found! row: %1, elen: %2").arg(rowPos).arg(elenPos), );
517 
518         if (!readsByAdapter.contains(a)) {
519             readsByAdapter[a] = QList<U2DataId>();
520         }
521         readsByAdapter[a].append(readId);
522     }
523     foreach (MTASingleTableAdapter *a, readsByAdapter.keys()) {
524         QList<U2DataId> &rangeReadIds = readsByAdapter[a];
525         a->singleTableAdapter->removeReads(rangeReadIds, os);
526         // TODO: remove adapters for empty tables. And tables as well
527     }
528 }
529 
dropReadsTables(U2OpStatus & os)530 void MultiTableAssemblyAdapter::dropReadsTables(U2OpStatus &os) {
531     // remove prepared queries to finalize them and prevent SQLite errors on table drop
532     db->preparedQueries.clear();
533 
534     for (QVector<MTASingleTableAdapter *> adaptersVector : qAsConst(adaptersGrid)) {
535         for (MTASingleTableAdapter *adapter : qAsConst(adaptersVector)) {
536             if (adapter != nullptr) {
537                 adapter->singleTableAdapter->dropReadsTables(os);
538             }
539         }
540     }
541 }
542 
pack(U2AssemblyPackStat & stat,U2OpStatus & os)543 void MultiTableAssemblyAdapter::pack(U2AssemblyPackStat &stat, U2OpStatus &os) {
544     MultiTablePackAlgorithmAdapter packAdapter(this);
545 
546     AssemblyPackAlgorithm::pack(packAdapter, stat, os);
547     packAdapter.releaseDbResources();
548 
549     quint64 t0 = GTimer::currentTimeMicros();
550     packAdapter.migrateAll(os);
551     perfLog.trace(QString("Assembly: table migration pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
552 
553     flushTables(os);
554 }
555 
calculateCoverage(const U2Region & region,U2AssemblyCoverageStat & coverage,U2OpStatus & os)556 void MultiTableAssemblyAdapter::calculateCoverage(const U2Region &region, U2AssemblyCoverageStat &coverage, U2OpStatus &os) {
557     for (int i = 0; i < adapters.size(); ++i) {
558         MTASingleTableAdapter *a = adapters.at(i);
559         a->singleTableAdapter->calculateCoverage(region, coverage, os);
560         if (os.isCoR()) {
561             break;
562         }
563 
564         os.setProgress((double(i + 1) / adapters.size()) * 100);
565     }
566 }
567 
568 //////////////////////////////////////////////////////////////////////////
569 // pack adapter
570 
MultiTablePackAlgorithmAdapter(MultiTableAssemblyAdapter * ma)571 MultiTablePackAlgorithmAdapter::MultiTablePackAlgorithmAdapter(MultiTableAssemblyAdapter *ma) {
572     multiTableAdapter = ma;
573     DbRef *db = multiTableAdapter->getDbRef();
574     int nElens = multiTableAdapter->getNumberOfElenRanges();
575     ensureGridSize(nElens);
576     foreach (MTASingleTableAdapter *a, multiTableAdapter->getAdapters()) {
577         SingleTablePackAlgorithmAdapter *sa = new SingleTablePackAlgorithmAdapter(db, a->singleTableAdapter->getReadsTableName());
578         packAdapters << sa;
579         if (packAdaptersGrid.size() <= a->rowPos) {
580             packAdaptersGrid.resize(a->rowPos + 1);
581         }
582         if (packAdaptersGrid[a->rowPos].size() <= a->elenPos) {
583             packAdaptersGrid[a->rowPos].resize(a->elenPos + 1);
584         }
585         packAdaptersGrid[a->rowPos][a->elenPos] = sa;
586     }
587 }
588 
selectAllReads(U2OpStatus & os)589 U2DbiIterator<PackAlgorithmData> *MultiTablePackAlgorithmAdapter::selectAllReads(U2OpStatus &os) {
590     QVector<U2DbiIterator<PackAlgorithmData> *> iterators;
591     foreach (SingleTablePackAlgorithmAdapter *a, packAdapters) {
592         iterators << a->selectAllReads(os);
593     }
594     return new MTAPackAlgorithmDataIterator(iterators, multiTableAdapter->getIdExtrasPerRange());
595 }
596 
~MultiTablePackAlgorithmAdapter()597 MultiTablePackAlgorithmAdapter::~MultiTablePackAlgorithmAdapter() {
598     qDeleteAll(packAdapters);
599 }
600 
assignProw(const U2DataId & readId,qint64 prow,U2OpStatus & os)601 void MultiTablePackAlgorithmAdapter::assignProw(const U2DataId &readId, qint64 prow, U2OpStatus &os) {
602     int elenPos = multiTableAdapter->getElenRangePosById(readId);
603     int oldRowPos = multiTableAdapter->getRowRangePosById(readId);
604     int newRowPos = multiTableAdapter->getRowRangePosByRow(prow);
605 
606     SingleTablePackAlgorithmAdapter *sa = nullptr;
607     if (newRowPos == oldRowPos) {
608         sa = packAdaptersGrid[oldRowPos][elenPos];
609         sa->assignProw(readId, prow, os);
610         return;
611     }
612     ensureGridSize(newRowPos + 1);
613 
614     sa = packAdaptersGrid[newRowPos][elenPos];
615     MTASingleTableAdapter *oldA = multiTableAdapter->getAdapterByRowAndElenRange(oldRowPos, elenPos, false, os);
616     MTASingleTableAdapter *newA = multiTableAdapter->getAdapterByRowAndElenRange(newRowPos, elenPos, true, os);
617 
618     SAFE_POINT(oldA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(oldRowPos).arg(elenPos), );
619     SAFE_POINT(newA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(newRowPos).arg(elenPos), );
620     SAFE_POINT_OP(os, );
621 
622     if (sa == nullptr) {
623         sa = new SingleTablePackAlgorithmAdapter(multiTableAdapter->getDbRef(), newA->singleTableAdapter->getReadsTableName());
624         packAdapters << sa;
625         packAdaptersGrid[newRowPos][elenPos] = sa;
626     }
627 
628     QVector<SQLiteReadTableMigrationData> &newTableData = migrations[newA];
629     newTableData.append(SQLiteReadTableMigrationData(U2DbiUtils::toDbiId(readId), oldA, prow));
630     // TODO: add mem check here!
631 }
632 
releaseDbResources()633 void MultiTablePackAlgorithmAdapter::releaseDbResources() {
634     foreach (SingleTablePackAlgorithmAdapter *a, packAdapters) {
635         a->releaseDbResources();
636     }
637 }
638 
migrate(MTASingleTableAdapter * newA,const QVector<SQLiteReadTableMigrationData> & data,qint64 migratedBefore,qint64 totalMigrationCount,U2OpStatus & os)639 void MultiTablePackAlgorithmAdapter::migrate(MTASingleTableAdapter *newA, const QVector<SQLiteReadTableMigrationData> &data, qint64 migratedBefore, qint64 totalMigrationCount, U2OpStatus &os) {
640     SAFE_POINT_OP(os, );
641     // delete reads from old tables, and insert into new one
642     QHash<MTASingleTableAdapter *, QVector<SQLiteReadTableMigrationData>> readsByOldTable;
643     foreach (const SQLiteReadTableMigrationData &d, data) {
644         readsByOldTable[d.oldTable].append(d);
645     }
646     DbRef *db = multiTableAdapter->getDbRef();
647     foreach (MTASingleTableAdapter *oldA, readsByOldTable.keys()) {
648         const QVector<SQLiteReadTableMigrationData> &migData = readsByOldTable[oldA];
649         if (migData.isEmpty()) {
650             continue;
651         }
652         QString oldTable = oldA->singleTableAdapter->getReadsTableName();
653         QString newTable = newA->singleTableAdapter->getReadsTableName();
654         QString idsTable = "tmp_mig_" + oldTable;  // TODO
655 
656 #ifdef _DEBUG
657         qint64 nOldReads1 = SQLiteReadQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
658         qint64 nNewReads1 = SQLiteReadQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
659         int readsMoved = migData.size();
660         int rowsPerRange = multiTableAdapter->getRowsPerRange();
661         U2Region newProwRegion(newA->rowPos * rowsPerRange, rowsPerRange);
662 #endif
663 
664         perfLog.trace(QString("Assembly: running reads migration from %1 to %2 number of reads: %3").arg(oldTable).arg(newTable).arg(migData.size()));
665         quint64 t0 = GTimer::currentTimeMicros();
666 
667         {  // nested block is needed to ensure all queries are finalized
668 
669             SQLiteWriteQuery(QString("CREATE TEMPORARY TABLE %1(id INTEGER PRIMARY KEY, prow INTEGER NOT NULL)").arg(idsTable), db, os).execute();
670             SQLiteWriteQuery insertIds(QString("INSERT INTO %1(id, prow) VALUES(?1, ?2)").arg(idsTable), db, os);
671             for (const SQLiteReadTableMigrationData &d : qAsConst(migData)) {
672                 insertIds.reset(false);
673                 insertIds.bindInt64(1, d.readId);
674                 insertIds.bindInt32(2, d.newProw);
675 #ifdef _DEBUG
676                 assert(newProwRegion.contains(d.newProw));
677 #endif
678                 insertIds.execute();
679                 if (os.hasError()) {
680                     break;
681                 }
682             }
683 
684             SQLiteWriteQuery(QString("INSERT INTO %1(prow, name, gstart, elen, flags, mq, data) "
685                                      "SELECT %3.prow, name, gstart, elen, flags, mq, data FROM %2, %3 WHERE %2.id = %3.id")
686                                  .arg(newTable)
687                                  .arg(oldTable)
688                                  .arg(idsTable),
689                              db,
690                              os)
691                 .execute();
692 
693             SQLiteWriteQuery(QString("DELETE FROM %1 WHERE id IN (SELECT id FROM %2)").arg(oldTable).arg(idsTable), db, os).execute();
694         }
695         U2OpStatusImpl osStub;  // using stub here -> this operation must be performed even if any of internal queries failed
696         SQLiteWriteQuery(QString("DROP TABLE IF EXISTS %1").arg(idsTable), db, osStub).execute();
697 
698         qint64 nMigrated = migratedBefore + migData.size();
699         perfLog.trace(QString("Assembly: reads migration from %1 to %2 finished, time %3 seconds, progress: %4/%5 (%6%)")
700                           .arg(oldTable)
701                           .arg(newTable)
702                           .arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000))
703                           .arg(nMigrated)
704                           .arg(totalMigrationCount)
705                           .arg(100 * nMigrated / totalMigrationCount));
706 
707 #ifdef _DEBUG
708         qint64 nOldReads2 = SQLiteReadQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
709         qint64 nNewReads2 = SQLiteReadQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
710         assert(nOldReads1 + nNewReads1 == nOldReads2 + nNewReads2);
711         assert(nNewReads1 + readsMoved == nNewReads2);
712 #endif
713     }
714 }
715 
migrateAll(U2OpStatus & os)716 void MultiTablePackAlgorithmAdapter::migrateAll(U2OpStatus &os) {
717     SAFE_POINT_OP(os, );
718 
719     qint64 nReadsToMigrate = 0;
720     foreach (MTASingleTableAdapter *newTable, migrations.keys()) {
721         const QVector<SQLiteReadTableMigrationData> &data = migrations[newTable];
722         nReadsToMigrate += data.size();
723     }
724     if (nReadsToMigrate == 0) {
725         return;
726     }
727     qint64 nReadsTotal = multiTableAdapter->countReads(U2_REGION_MAX, os);
728     qint64 migrationPercent = nReadsToMigrate * 100 / nReadsTotal;
729 
730     perfLog.trace(QString("Assembly: starting reads migration process. Reads to migrate: %1, total: %2 (%3%)").arg(nReadsToMigrate).arg(nReadsTotal).arg(migrationPercent));
731 
732 #define MAX_PERCENT_TO_REINDEX 20
733     if (migrationPercent > MAX_PERCENT_TO_REINDEX) {
734         perfLog.trace("Assembly: dropping old indexes first");
735         foreach (MTASingleTableAdapter *adapter, multiTableAdapter->getAdapters()) {
736             adapter->singleTableAdapter->dropReadsIndexes(os);
737         }
738         perfLog.trace("Assembly: indexes are dropped");
739     }
740 
741     SAFE_POINT_OP(os, );
742     int nMigrated = 0;
743     foreach (MTASingleTableAdapter *newTable, migrations.keys()) {
744         const QVector<SQLiteReadTableMigrationData> &data = migrations[newTable];
745         migrate(newTable, data, nMigrated, nReadsToMigrate, os);
746         nMigrated += data.size();
747     }
748     migrations.clear();
749 }
750 
ensureGridSize(int nRows)751 void MultiTablePackAlgorithmAdapter::ensureGridSize(int nRows) {
752     int oldNRows = packAdaptersGrid.size();
753     if (oldNRows < nRows) {
754         int nElens = multiTableAdapter->getNumberOfElenRanges();
755         packAdaptersGrid.resize(nRows);
756         for (int i = oldNRows; i < nRows; i++) {
757             packAdaptersGrid[i].resize(nElens);
758         }
759     }
760 }
761 
762 //////////////////////////////////////////////////////////////////////////
763 // MTAReadsIterator
764 
MTAReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> * > & i,const QVector<QByteArray> & ie,bool sorted)765 MTAReadsIterator::MTAReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> *> &i, const QVector<QByteArray> &ie, bool sorted)
766     : iterators(i), currentRange(0), idExtras(ie), sortedHint(sorted) {
767 }
768 
~MTAReadsIterator()769 MTAReadsIterator::~MTAReadsIterator() {
770     qDeleteAll(iterators);
771 }
772 
773 // TODO: remove copy-paste from this code
hasNext()774 bool MTAReadsIterator::hasNext() {
775     if (sortedHint) {
776         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
777             if (it->hasNext()) {
778                 return true;
779             }
780         }
781         return false;
782     } else {
783         bool res = currentRange < iterators.size();
784         if (res) {
785             do {
786                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
787                 res = it->hasNext();
788                 if (res) {
789                     break;
790                 }
791                 currentRange++;
792             } while (currentRange < iterators.size());
793         }
794         return res;
795     }
796 }
797 
next()798 U2AssemblyRead MTAReadsIterator::next() {
799     U2AssemblyRead res;
800     if (sortedHint) {
801         qint64 minPos = LLONG_MAX;
802         U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
803         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
804             if (it->hasNext()) {
805                 U2AssemblyRead candidate = it->peek();
806                 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
807                 if (candidate->leftmostPos < minPos) {
808                     minIt = it;
809                     minPos = candidate->leftmostPos;
810                 }
811             }
812         }
813         if (nullptr != minIt) {
814             res = minIt->next();
815             SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
816             int currentIt = iterators.indexOf(minIt);
817             const QByteArray &idExtra = idExtras.at(currentIt);
818             res->id = addTable2Id(res->id, idExtra);
819         }
820         return res;
821     } else {
822         if (currentRange < iterators.size()) {
823             do {
824                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
825                 if (it->hasNext()) {
826                     res = it->next();
827                     SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
828                     const QByteArray &idExtra = idExtras.at(currentRange);
829                     res->id = addTable2Id(res->id, idExtra);
830                     break;
831                 }
832                 currentRange++;
833             } while (currentRange < iterators.size());
834         }
835         return res;
836     }
837 }
838 
peek()839 U2AssemblyRead MTAReadsIterator::peek() {
840     U2AssemblyRead res;
841     if (sortedHint) {
842         qint64 minPos = LLONG_MAX;
843         U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
844         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
845             if (it->hasNext()) {
846                 U2AssemblyRead candidate = it->peek();
847                 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
848                 if (candidate->leftmostPos < minPos) {
849                     minIt = it;
850                     minPos = candidate->leftmostPos;
851                 }
852             }
853         }
854         if (nullptr != minIt) {
855             res = minIt->next();
856             SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
857             int currentIt = iterators.indexOf(minIt);
858             const QByteArray &idExtra = idExtras.at(currentIt);
859             res->id = addTable2Id(res->id, idExtra);
860         }
861         return res;
862     } else {
863         if (currentRange < iterators.size()) {
864             do {
865                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
866                 if (it->hasNext()) {
867                     res = it->peek();
868                     SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
869                     const QByteArray &idExtra = idExtras.at(currentRange);
870                     res->id = addTable2Id(res->id, idExtra);
871                     break;
872                 }
873                 currentRange++;
874             } while (currentRange < iterators.size());
875         }
876         return res;
877     }
878 }
879 
880 //////////////////////////////////////////////////////////////////////////
881 // MTAPackAlgorithmDataIterator
882 
MTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> * > & i,const QVector<QByteArray> & ie)883 MTAPackAlgorithmDataIterator::MTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> *> &i, const QVector<QByteArray> &ie)
884     : iterators(i), idExtras(ie) {
885     fetchNextData();
886 }
887 
~MTAPackAlgorithmDataIterator()888 MTAPackAlgorithmDataIterator::~MTAPackAlgorithmDataIterator() {
889     qDeleteAll(iterators);
890 }
891 
hasNext()892 bool MTAPackAlgorithmDataIterator::hasNext() {
893     return !nextData.readId.isEmpty();
894 }
895 
next()896 PackAlgorithmData MTAPackAlgorithmDataIterator::next() {
897     PackAlgorithmData res = nextData;
898     fetchNextData();
899     return res;
900 }
901 
peek()902 PackAlgorithmData MTAPackAlgorithmDataIterator::peek() {
903     return nextData;
904 }
905 
fetchNextData()906 void MTAPackAlgorithmDataIterator::fetchNextData() {
907     PackAlgorithmData bestCandidate;
908     int bestRange = 0;
909     for (int i = 0; i < iterators.size(); i++) {
910         U2DbiIterator<PackAlgorithmData> *it = iterators[i];
911         if (!it->hasNext()) {
912             continue;
913         }
914         PackAlgorithmData d = it->peek();
915         if (bestCandidate.readId.isEmpty() || bestCandidate.leftmostPos > d.leftmostPos) {
916             bestCandidate = d;
917             bestRange = i;
918         }
919     }
920     nextData = bestCandidate;
921     if (!nextData.readId.isEmpty()) {
922         iterators[bestRange]->next();
923         const QByteArray &idExtra = idExtras.at(bestRange);
924         nextData.readId = addTable2Id(nextData.readId, idExtra);
925     }
926 }
927 
928 }  // namespace U2
929