1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "MultiTableAssemblyAdapter.h"
23
24 #include <U2Core/Timer.h>
25 #include <U2Core/U2AssemblyUtils.h>
26 #include <U2Core/U2OpStatusUtils.h>
27 #include <U2Core/U2SafePoints.h>
28 #include <U2Core/U2SqlHelpers.h>
29
30 #include "../SQLiteAssemblyDbi.h"
31 #include "../SQLiteDbi.h"
32 #include "../SQLiteObjectDbi.h"
33
34 namespace U2 {
35
36 #define DEFAULT_ROWS_PER_TABLE 5000
37
MultiTableAssemblyAdapter(SQLiteDbi * _dbi,const U2DataId & assemblyId,const AssemblyCompressor * compressor,DbRef * db,U2OpStatus & os)38 MultiTableAssemblyAdapter::MultiTableAssemblyAdapter(SQLiteDbi *_dbi, const U2DataId &assemblyId, const AssemblyCompressor *compressor, DbRef *db, U2OpStatus &os)
39 : SQLiteAssemblyAdapter(assemblyId, compressor, db) {
40 dbi = _dbi;
41 version = -1;
42 syncTables(os);
43 rowsPerRange = DEFAULT_ROWS_PER_TABLE;
44 }
45
~MultiTableAssemblyAdapter()46 MultiTableAssemblyAdapter::~MultiTableAssemblyAdapter() {
47 clearTableAdaptersInfo();
48 }
49
clearTableAdaptersInfo()50 void MultiTableAssemblyAdapter::clearTableAdaptersInfo() {
51 qDeleteAll(adapters);
52 adaptersGrid.clear();
53 idExtras.clear();
54 elenRanges.clear();
55 }
56
syncTables(U2OpStatus & os)57 void MultiTableAssemblyAdapter::syncTables(U2OpStatus &os) {
58 qint64 versionInDb = dbi->getObjectDbi()->getObjectVersion(assemblyId, os);
59 if (versionInDb <= version) {
60 return;
61 }
62 SQLiteReadQuery q("SELECT idata FROM Assembly WHERE object = ?1", db, os);
63 q.bindDataId(1, assemblyId);
64 if (q.step()) {
65 QByteArray data = q.getBlob(0);
66 rereadTables(data, os);
67 if (!os.hasError()) {
68 version = versionInDb;
69 }
70 }
71 }
72
toRange(const QVector<int> & startPos)73 static QVector<U2Region> toRange(const QVector<int> &startPos) {
74 QVector<U2Region> res;
75 int prev = 0;
76 foreach (int pos, startPos) {
77 res << U2Region(prev, pos - prev);
78 prev = pos;
79 }
80 return res;
81 }
82
initTables(const QList<U2AssemblyRead> & reads,U2OpStatus & os)83 void MultiTableAssemblyAdapter::initTables(const QList<U2AssemblyRead> &reads, U2OpStatus &os) {
84 if (os.hasError()) {
85 return;
86 }
87 SAFE_POINT(elenRanges.isEmpty(), "Effective ranges are already initialized!", );
88
89 int nReads = reads.size();
90 if (false && nReads > 1000) {
91 /* // get reads distribution first
92 QVector<int> distribution(nReads / 10, 0);
93 int* data = distribution.data();
94 foreach(const U2AssemblyRead& read, reads) {
95 int elen = read->readSequence.size() + U2AssemblyUtils::getCigarExtraLength(read->cigar);
96 int idx = elen / 10;
97 data[idx]++;
98 }
99 // derive regions
100 // TODO:*/
101 } else {
102 QVector<int> starts;
103 starts << 50 << 100 << 200 << 400 << 800 << 4 * 1000 << 25 * 1000 << 100 * 1000 << 500 * 1000 << 2 * 1000 * 1000;
104 elenRanges << toRange(starts);
105 }
106
107 initAdaptersGrid(1, elenRanges.size());
108
109 flushTables(os);
110 }
111
rereadTables(const QByteArray & idata,U2OpStatus & os)112 void MultiTableAssemblyAdapter::rereadTables(const QByteArray &idata, U2OpStatus &os) {
113 QWriteLocker wl(&tablesSyncLock);
114
115 clearTableAdaptersInfo();
116
117 // format: N, N, N, N | N, N |.....
118 // elements are separated by | sign. First element encodes ranges, second prow step and max prow, others are for future extension
119 if (idata.isEmpty()) {
120 // assembly is empty - no index data was created
121 return;
122 }
123 QList<QByteArray> elements = idata.split('|');
124 if (elements.size() < 2) {
125 os.setError(U2DbiL10n::tr("Failed to detect assembly storage format: %1").arg(idata.constData()));
126 return;
127 }
128 QByteArray elenData = elements.at(0);
129 QByteArray prowData = elements.at(1);
130
131 QList<QByteArray> elenTokens = elenData.split(',');
132 U2Region prev(-1, 1);
133 bool parseOk = true;
134 foreach (const QByteArray &elenTok, elenTokens) {
135 int start = elenTok.toInt(&parseOk);
136 if (!parseOk || start < prev.endPos()) {
137 os.setError(QString("Failed to parse range: %1, full: %2").arg(elenTok.constData()).arg(elenData.constData()));
138 return;
139 }
140 U2Region region(prev.endPos(), start - prev.endPos());
141 elenRanges << region;
142 prev = region;
143 }
144 elenRanges << U2Region(prev.endPos(), INT_MAX);
145
146 QList<QByteArray> prowTokens = prowData.split(',');
147 int prange = prowTokens.at(0).toInt(&parseOk);
148 if (prange < 1 || !parseOk) {
149 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
150 return;
151 }
152 if (prowTokens.size() != 2) {
153 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
154 return;
155 }
156 int nRows = prowTokens.at(1).toInt(&parseOk);
157 if (nRows < 0 || !parseOk) {
158 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
159 return;
160 }
161
162 // ok, all regions parsed, now create adapters
163 int nElens = elenRanges.size();
164 initAdaptersGrid(nRows, nElens);
165 for (int rowPos = 0; rowPos < nRows; rowPos++) {
166 for (int elenPos = 0; elenPos < nElens; elenPos++) {
167 QString suffix = getTableSuffix(rowPos, elenPos);
168 QString tableName = SingleTableAssemblyAdapter::getReadsTableName(assemblyId, 'M', suffix);
169 if (SQLiteUtils::isTableExists(tableName, db, os)) {
170 createAdapter(rowPos, elenPos, os);
171 }
172 }
173 }
174 }
175
flushTables(U2OpStatus & os)176 void MultiTableAssemblyAdapter::flushTables(U2OpStatus &os) {
177 bool empty = adaptersGrid.isEmpty();
178 if (empty) {
179 // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
180 initTables(QList<U2AssemblyRead>(), os);
181 if (os.hasError()) {
182 return;
183 }
184 }
185 QByteArray idata;
186 for (int i = 0; i < elenRanges.size(); i++) {
187 int rangeStart = elenRanges[i].startPos;
188 if (!idata.isEmpty()) {
189 idata.append(',');
190 }
191 idata.append(QByteArray::number(rangeStart));
192 }
193 idata.append('|').append(QByteArray::number(rowsPerRange)).append(',').append(QByteArray::number(adaptersGrid.size()));
194
195 SQLiteWriteQuery q("UPDATE Assembly SET idata = ?1 WHERE object = ?2", db, os);
196 q.bindBlob(1, idata);
197 q.bindDataId(2, assemblyId);
198 q.execute();
199 }
200
getTableSuffix(int rowPos,int elenPos)201 QString MultiTableAssemblyAdapter::getTableSuffix(int rowPos, int elenPos) {
202 U2Region eRegion = elenRanges[elenPos];
203 bool last = (elenPos + 1 == elenRanges.size());
204 return QString("%1_%2_%3").arg(eRegion.startPos).arg(last ? QString("U") : QString::number(eRegion.endPos())).arg(rowPos);
205 }
206
initAdaptersGrid(int nRows,int nElens)207 void MultiTableAssemblyAdapter::initAdaptersGrid(int nRows, int nElens) {
208 assert(adaptersGrid.isEmpty());
209 adaptersGrid.resize(nRows);
210 for (int i = 0; i < nRows; i++) {
211 adaptersGrid[i] = QVector<MTASingleTableAdapter *>(nElens, nullptr);
212 }
213 }
214
createAdapter(int rowPos,int elenPos,U2OpStatus & os)215 MTASingleTableAdapter *MultiTableAssemblyAdapter::createAdapter(int rowPos, int elenPos, U2OpStatus &os) {
216 assert(adaptersGrid.at(rowPos).at(elenPos) == nullptr);
217
218 QString suffix = getTableSuffix(rowPos, elenPos);
219 SingleTableAssemblyAdapter *sa = new SingleTableAssemblyAdapter(dbi, assemblyId, 'M', suffix, compressor, db, os);
220 const U2Region &elenRange = elenRanges.at(elenPos);
221 sa->enableRangeTableMode(elenRange.startPos, elenRange.endPos());
222 QByteArray idExtra = getIdExtra(rowPos, elenPos);
223 MTASingleTableAdapter *ma = new MTASingleTableAdapter(sa, rowPos, elenPos, idExtra);
224 ma->singleTableAdapter->createReadsTables(os);
225 adapters << ma;
226 idExtras << idExtra;
227 adaptersGrid[rowPos][elenPos] = ma;
228 return ma;
229 }
230
createReadsIndexes(U2OpStatus & os)231 void MultiTableAssemblyAdapter::createReadsIndexes(U2OpStatus &os) {
232 SQLiteWriteQuery("PRAGMA temp_store = FILE", db, os).execute();
233 CHECK_OP(os, );
234 foreach (MTASingleTableAdapter *a, adapters) {
235 a->singleTableAdapter->createReadsIndexes(os);
236 if (os.hasError()) {
237 break;
238 }
239 }
240 SQLiteWriteQuery("PRAGMA temp_store = MEMORY", db, os).execute();
241 }
242
getIdExtra(int rowPos,int elenPos)243 QByteArray MultiTableAssemblyAdapter::getIdExtra(int rowPos, int elenPos) {
244 QByteArray res(4, 0);
245 qint16 *data = (qint16 *)res.data();
246 data[0] = (qint16)rowPos;
247 data[1] = (qint16)elenPos;
248 return res;
249 }
250
countReads(const U2Region & r,U2OpStatus & os)251 qint64 MultiTableAssemblyAdapter::countReads(const U2Region &r, U2OpStatus &os) {
252 bool all = r == U2_REGION_MAX;
253 qint64 sum = 0;
254 // use more sensitive algorithm for smaller regions with low amount of reads
255 // and not-very sensitive for huge regions with a lot of reads
256 int nReadsToUseNotPreciseAlgorithms = 1000 / (r.length + 1);
257 foreach (MTASingleTableAdapter *a, adapters) {
258 int n = a->singleTableAdapter->countReads(r, os);
259 if (n != 0 && !all && n < nReadsToUseNotPreciseAlgorithms) {
260 n = a->singleTableAdapter->countReadsPrecise(r, os);
261 }
262 if (os.hasError()) {
263 break;
264 }
265 sum += n;
266 }
267 return sum;
268 }
269
getMaxPackedRow(const U2Region & r,U2OpStatus & os)270 qint64 MultiTableAssemblyAdapter::getMaxPackedRow(const U2Region &r, U2OpStatus &os) {
271 qint64 max = 0;
272 // process only hi row adapters
273 int nRows = adaptersGrid.size();
274 for (int rowPos = nRows; --rowPos >= 0 && max == 0;) {
275 QVector<MTASingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
276 for (int elenPos = 0, nElens = elenAdapters.size(); elenPos < nElens; elenPos++) {
277 MTASingleTableAdapter *a = elenAdapters.at(elenPos);
278 if (a == nullptr) {
279 continue;
280 }
281 assert(a->rowPos == rowPos);
282 qint64 n = a->singleTableAdapter->getMaxPackedRow(r, os);
283 assert(U2Region(rowsPerRange * rowPos, rowsPerRange).contains(n));
284 max = qMax(max, n);
285 }
286 }
287 return max;
288 }
289
getMaxEndPos(U2OpStatus & os)290 qint64 MultiTableAssemblyAdapter::getMaxEndPos(U2OpStatus &os) {
291 // TODO: optimize by using gstart + maxReadLen for first n-1 tables
292 qint64 max = 0;
293 foreach (MTASingleTableAdapter *a, adapters) {
294 qint64 n = a->singleTableAdapter->getMaxEndPos(os);
295 if (os.hasError()) {
296 break;
297 }
298 max = qMax(max, n);
299 }
300 return max;
301 }
302
getReads(const U2Region & r,U2OpStatus & os,bool sortedHint)303 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReads(const U2Region &r, U2OpStatus &os, bool sortedHint) {
304 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
305 foreach (MTASingleTableAdapter *a, adapters) {
306 iterators << a->singleTableAdapter->getReads(r, os, sortedHint);
307 if (os.hasError()) {
308 break;
309 }
310 }
311 if (os.hasError()) {
312 qDeleteAll(iterators);
313 return nullptr;
314 }
315 return new MTAReadsIterator(iterators, idExtras, sortedHint);
316 }
317
getReadsByRow(const U2Region & r,qint64 minRow,qint64 maxRow,U2OpStatus & os)318 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReadsByRow(const U2Region &r, qint64 minRow, qint64 maxRow, U2OpStatus &os) {
319 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
320 QVector<QByteArray> selectedIdExtras;
321 U2Region targetRowRange(minRow, maxRow - minRow);
322 foreach (MTASingleTableAdapter *a, adapters) {
323 const U2Region rowRegion(a->rowPos * rowsPerRange, rowsPerRange);
324 if (!rowRegion.intersects(targetRowRange)) {
325 continue;
326 }
327 iterators << a->singleTableAdapter->getReadsByRow(r, minRow, maxRow, os);
328 selectedIdExtras << a->idExtra;
329 if (os.hasError()) {
330 break;
331 }
332 }
333 if (os.hasError()) {
334 qDeleteAll(iterators);
335 return nullptr;
336 }
337 return new MTAReadsIterator(iterators, selectedIdExtras, false);
338 }
339
getReadsByName(const QByteArray & name,U2OpStatus & os)340 U2DbiIterator<U2AssemblyRead> *MultiTableAssemblyAdapter::getReadsByName(const QByteArray &name, U2OpStatus &os) {
341 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
342 foreach (MTASingleTableAdapter *a, adapters) {
343 iterators << a->singleTableAdapter->getReadsByName(name, os);
344 if (os.hasError()) {
345 break;
346 }
347 }
348 if (os.hasError()) {
349 qDeleteAll(iterators);
350 return nullptr;
351 }
352 return new MTAReadsIterator(iterators, idExtras, false);
353 }
354
getElenRangePosById(const U2DataId & id) const355 int MultiTableAssemblyAdapter::getElenRangePosById(const U2DataId &id) const {
356 QByteArray extra = U2DbiUtils::toDbExtra(id);
357
358 SAFE_POINT(extra.size() == 4, QString("Illegal assembly read ID extra part! HEX: %1").arg(extra.toHex().constData()), -1);
359
360 const qint16 *data = (const qint16 *)extra.constData();
361 return int(data[1]);
362 }
363
getElenRangePosByLength(qint64 readLength) const364 int MultiTableAssemblyAdapter::getElenRangePosByLength(qint64 readLength) const {
365 int nElenRanges = elenRanges.size();
366 for (int i = 0; i < nElenRanges; i++) {
367 const U2Region &r = elenRanges[i];
368 if (r.contains(readLength)) {
369 return i;
370 }
371 }
372 FAIL(QString("Read length does not fit any range: %1, number of ranges: %2").arg(readLength).arg(nElenRanges), nElenRanges - 1);
373 }
374
getRowRangePosByRow(quint64 row) const375 int MultiTableAssemblyAdapter::getRowRangePosByRow(quint64 row) const {
376 return row / rowsPerRange;
377 }
378
getRowRangePosById(const U2DataId & id) const379 int MultiTableAssemblyAdapter::getRowRangePosById(const U2DataId &id) const {
380 QByteArray extra = U2DbiUtils::toDbExtra(id);
381
382 SAFE_POINT(extra.size() == 4, QString("Extra part size of assembly read ID is not correct! HEX(Extra): %1").arg(extra.toHex().constData()), -1);
383
384 const qint16 *data = (const qint16 *)extra.constData();
385 return int(data[0]);
386 }
387
addTable2Id(const U2DataId & id,const QByteArray & idExtra)388 static U2DataId addTable2Id(const U2DataId &id, const QByteArray &idExtra) {
389 assert(U2DbiUtils::toDbExtra(id).isEmpty());
390 U2DataId res = U2DbiUtils::toU2DataId(U2DbiUtils::toDbiId(id), U2Type::AssemblyRead, idExtra);
391 return res;
392 }
393
ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> & grid,int rowPos,int nElens)394 static void ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> &grid, int rowPos, int nElens) {
395 int oldRows = grid.size();
396 if (oldRows > rowPos) {
397 return;
398 }
399 int newRows = rowPos + 1;
400 grid.resize(newRows);
401 for (int r = oldRows; r < newRows; r++) {
402 grid[r].resize(nElens);
403 }
404 }
405
406 #define N_READS_TO_FLUSH_TOTAL 100000
407 #define N_READS_TO_FLUSH_PER_RANGE 10000
408
addReads(U2DbiIterator<U2AssemblyRead> * it,U2AssemblyReadsImportInfo & ii,U2OpStatus & os)409 void MultiTableAssemblyAdapter::addReads(U2DbiIterator<U2AssemblyRead> *it, U2AssemblyReadsImportInfo &ii, U2OpStatus &os) {
410 bool empty = adaptersGrid.isEmpty();
411 if (empty) {
412 // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
413 initTables(QList<U2AssemblyRead>(), os);
414 if (os.hasError()) {
415 return;
416 }
417 }
418
419 bool packIsOn = empty;
420 qint64 prevLeftmostPos = -1;
421 PackAlgorithmContext packContext;
422
423 QVector<QVector<QList<U2AssemblyRead>>> readsGrid; // reads sorted by range
424 bool lastIteration = false;
425 qint64 readsInGrid = 0;
426 while (!os.isCoR()) {
427 int nElens = elenRanges.size();
428 if (it->hasNext()) {
429 U2AssemblyRead read = it->next();
430 if (os.isCoR()) {
431 break;
432 }
433 int readLen = read->readSequence.length();
434 read->effectiveLen = readLen + U2AssemblyUtils::getCigarExtraLength(read->cigar);
435 int elenPos = getElenRangePosByLength(read->effectiveLen);
436
437 packIsOn = packIsOn && read->leftmostPos >= prevLeftmostPos;
438 read->packedViewRow = packIsOn ? AssemblyPackAlgorithm::packRead(U2Region(read->leftmostPos, read->effectiveLen), packContext, os) : 0;
439 int rowPos = getRowRangePosByRow(read->packedViewRow);
440 ensureGridSize(readsGrid, rowPos, nElens);
441 readsGrid[rowPos][elenPos] << read;
442 ++ii.nReads;
443 ++readsInGrid;
444 } else {
445 lastIteration = true;
446 }
447
448 int nRows = readsGrid.size();
449 if (lastIteration || readsInGrid > N_READS_TO_FLUSH_TOTAL) {
450 for (int rowPos = 0; rowPos < nRows && !os.isCoR(); rowPos++) {
451 for (int elenPos = 0; elenPos < nElens && !os.isCoR(); elenPos++) {
452 QList<U2AssemblyRead> &rangeReads = readsGrid[rowPos][elenPos];
453 int nRangeReads = rangeReads.size();
454 if (nRangeReads == 0 || (!lastIteration && nRangeReads < N_READS_TO_FLUSH_PER_RANGE)) {
455 continue;
456 }
457 MTASingleTableAdapter *adapter = getAdapterByRowAndElenRange(rowPos, elenPos, true, os);
458 U2AssemblyReadsImportInfo rangeReadsImportInfo;
459 // pass the same coverage info through all adapters to accumulate coverage
460 rangeReadsImportInfo.coverageInfo = ii.coverageInfo;
461 BufferedDbiIterator<U2AssemblyRead> rangeReadsIterator(rangeReads);
462 adapter->singleTableAdapter->addReads(&rangeReadsIterator, rangeReadsImportInfo, os);
463 ii.coverageInfo = rangeReadsImportInfo.coverageInfo;
464 readsInGrid -= rangeReads.size();
465 rangeReads.clear();
466 }
467 }
468 }
469 if (lastIteration) {
470 break;
471 }
472 }
473
474 if (packIsOn && !os.hasError()) {
475 ii.packStat.readsCount = ii.nReads;
476 ii.packStat.maxProw = packContext.maxProw;
477 ii.packed = true;
478 flushTables(os);
479 }
480 }
481
getAdapterByRowAndElenRange(int rowPos,int elenPos,bool createIfNotExits,U2OpStatus & os)482 MTASingleTableAdapter *MultiTableAssemblyAdapter::getAdapterByRowAndElenRange(int rowPos, int elenPos, bool createIfNotExits, U2OpStatus &os) {
483 int nElens = elenRanges.size();
484 assert(elenPos < nElens);
485 if (rowPos >= adaptersGrid.size()) {
486 assert(createIfNotExits);
487 if (!createIfNotExits) {
488 return nullptr;
489 }
490 int oldRowSize = adaptersGrid.size();
491 int newRowSize = rowPos + 1;
492 adaptersGrid.resize(newRowSize);
493 for (int i = oldRowSize; i < newRowSize; i++) {
494 adaptersGrid[i].resize(nElens);
495 }
496 }
497 QVector<MTASingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
498 assert(elenAdapters.size() == nElens);
499 MTASingleTableAdapter *adapter = elenAdapters.at(elenPos);
500 if (adapter == nullptr && createIfNotExits) {
501 adapter = createAdapter(rowPos, elenPos, os);
502 }
503 return adapter;
504 }
505
removeReads(const QList<U2DataId> & readIds,U2OpStatus & os)506 void MultiTableAssemblyAdapter::removeReads(const QList<U2DataId> &readIds, U2OpStatus &os) {
507 int nReads = readIds.size();
508
509 QHash<MTASingleTableAdapter *, QList<U2DataId>> readsByAdapter;
510 for (int i = 0; i < nReads; i++) {
511 const U2DataId &readId = readIds[i];
512 int rowPos = getRowRangePosById(readId);
513 int elenPos = getElenRangePosById(readId);
514 MTASingleTableAdapter *a = getAdapterByRowAndElenRange(rowPos, elenPos, false, os);
515
516 SAFE_POINT(a != nullptr, QString("No table adapter was found! row: %1, elen: %2").arg(rowPos).arg(elenPos), );
517
518 if (!readsByAdapter.contains(a)) {
519 readsByAdapter[a] = QList<U2DataId>();
520 }
521 readsByAdapter[a].append(readId);
522 }
523 foreach (MTASingleTableAdapter *a, readsByAdapter.keys()) {
524 QList<U2DataId> &rangeReadIds = readsByAdapter[a];
525 a->singleTableAdapter->removeReads(rangeReadIds, os);
526 // TODO: remove adapters for empty tables. And tables as well
527 }
528 }
529
dropReadsTables(U2OpStatus & os)530 void MultiTableAssemblyAdapter::dropReadsTables(U2OpStatus &os) {
531 // remove prepared queries to finalize them and prevent SQLite errors on table drop
532 db->preparedQueries.clear();
533
534 for (QVector<MTASingleTableAdapter *> adaptersVector : qAsConst(adaptersGrid)) {
535 for (MTASingleTableAdapter *adapter : qAsConst(adaptersVector)) {
536 if (adapter != nullptr) {
537 adapter->singleTableAdapter->dropReadsTables(os);
538 }
539 }
540 }
541 }
542
pack(U2AssemblyPackStat & stat,U2OpStatus & os)543 void MultiTableAssemblyAdapter::pack(U2AssemblyPackStat &stat, U2OpStatus &os) {
544 MultiTablePackAlgorithmAdapter packAdapter(this);
545
546 AssemblyPackAlgorithm::pack(packAdapter, stat, os);
547 packAdapter.releaseDbResources();
548
549 quint64 t0 = GTimer::currentTimeMicros();
550 packAdapter.migrateAll(os);
551 perfLog.trace(QString("Assembly: table migration pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
552
553 flushTables(os);
554 }
555
calculateCoverage(const U2Region & region,U2AssemblyCoverageStat & coverage,U2OpStatus & os)556 void MultiTableAssemblyAdapter::calculateCoverage(const U2Region ®ion, U2AssemblyCoverageStat &coverage, U2OpStatus &os) {
557 for (int i = 0; i < adapters.size(); ++i) {
558 MTASingleTableAdapter *a = adapters.at(i);
559 a->singleTableAdapter->calculateCoverage(region, coverage, os);
560 if (os.isCoR()) {
561 break;
562 }
563
564 os.setProgress((double(i + 1) / adapters.size()) * 100);
565 }
566 }
567
568 //////////////////////////////////////////////////////////////////////////
569 // pack adapter
570
MultiTablePackAlgorithmAdapter(MultiTableAssemblyAdapter * ma)571 MultiTablePackAlgorithmAdapter::MultiTablePackAlgorithmAdapter(MultiTableAssemblyAdapter *ma) {
572 multiTableAdapter = ma;
573 DbRef *db = multiTableAdapter->getDbRef();
574 int nElens = multiTableAdapter->getNumberOfElenRanges();
575 ensureGridSize(nElens);
576 foreach (MTASingleTableAdapter *a, multiTableAdapter->getAdapters()) {
577 SingleTablePackAlgorithmAdapter *sa = new SingleTablePackAlgorithmAdapter(db, a->singleTableAdapter->getReadsTableName());
578 packAdapters << sa;
579 if (packAdaptersGrid.size() <= a->rowPos) {
580 packAdaptersGrid.resize(a->rowPos + 1);
581 }
582 if (packAdaptersGrid[a->rowPos].size() <= a->elenPos) {
583 packAdaptersGrid[a->rowPos].resize(a->elenPos + 1);
584 }
585 packAdaptersGrid[a->rowPos][a->elenPos] = sa;
586 }
587 }
588
selectAllReads(U2OpStatus & os)589 U2DbiIterator<PackAlgorithmData> *MultiTablePackAlgorithmAdapter::selectAllReads(U2OpStatus &os) {
590 QVector<U2DbiIterator<PackAlgorithmData> *> iterators;
591 foreach (SingleTablePackAlgorithmAdapter *a, packAdapters) {
592 iterators << a->selectAllReads(os);
593 }
594 return new MTAPackAlgorithmDataIterator(iterators, multiTableAdapter->getIdExtrasPerRange());
595 }
596
~MultiTablePackAlgorithmAdapter()597 MultiTablePackAlgorithmAdapter::~MultiTablePackAlgorithmAdapter() {
598 qDeleteAll(packAdapters);
599 }
600
assignProw(const U2DataId & readId,qint64 prow,U2OpStatus & os)601 void MultiTablePackAlgorithmAdapter::assignProw(const U2DataId &readId, qint64 prow, U2OpStatus &os) {
602 int elenPos = multiTableAdapter->getElenRangePosById(readId);
603 int oldRowPos = multiTableAdapter->getRowRangePosById(readId);
604 int newRowPos = multiTableAdapter->getRowRangePosByRow(prow);
605
606 SingleTablePackAlgorithmAdapter *sa = nullptr;
607 if (newRowPos == oldRowPos) {
608 sa = packAdaptersGrid[oldRowPos][elenPos];
609 sa->assignProw(readId, prow, os);
610 return;
611 }
612 ensureGridSize(newRowPos + 1);
613
614 sa = packAdaptersGrid[newRowPos][elenPos];
615 MTASingleTableAdapter *oldA = multiTableAdapter->getAdapterByRowAndElenRange(oldRowPos, elenPos, false, os);
616 MTASingleTableAdapter *newA = multiTableAdapter->getAdapterByRowAndElenRange(newRowPos, elenPos, true, os);
617
618 SAFE_POINT(oldA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(oldRowPos).arg(elenPos), );
619 SAFE_POINT(newA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(newRowPos).arg(elenPos), );
620 SAFE_POINT_OP(os, );
621
622 if (sa == nullptr) {
623 sa = new SingleTablePackAlgorithmAdapter(multiTableAdapter->getDbRef(), newA->singleTableAdapter->getReadsTableName());
624 packAdapters << sa;
625 packAdaptersGrid[newRowPos][elenPos] = sa;
626 }
627
628 QVector<SQLiteReadTableMigrationData> &newTableData = migrations[newA];
629 newTableData.append(SQLiteReadTableMigrationData(U2DbiUtils::toDbiId(readId), oldA, prow));
630 // TODO: add mem check here!
631 }
632
releaseDbResources()633 void MultiTablePackAlgorithmAdapter::releaseDbResources() {
634 foreach (SingleTablePackAlgorithmAdapter *a, packAdapters) {
635 a->releaseDbResources();
636 }
637 }
638
migrate(MTASingleTableAdapter * newA,const QVector<SQLiteReadTableMigrationData> & data,qint64 migratedBefore,qint64 totalMigrationCount,U2OpStatus & os)639 void MultiTablePackAlgorithmAdapter::migrate(MTASingleTableAdapter *newA, const QVector<SQLiteReadTableMigrationData> &data, qint64 migratedBefore, qint64 totalMigrationCount, U2OpStatus &os) {
640 SAFE_POINT_OP(os, );
641 // delete reads from old tables, and insert into new one
642 QHash<MTASingleTableAdapter *, QVector<SQLiteReadTableMigrationData>> readsByOldTable;
643 foreach (const SQLiteReadTableMigrationData &d, data) {
644 readsByOldTable[d.oldTable].append(d);
645 }
646 DbRef *db = multiTableAdapter->getDbRef();
647 foreach (MTASingleTableAdapter *oldA, readsByOldTable.keys()) {
648 const QVector<SQLiteReadTableMigrationData> &migData = readsByOldTable[oldA];
649 if (migData.isEmpty()) {
650 continue;
651 }
652 QString oldTable = oldA->singleTableAdapter->getReadsTableName();
653 QString newTable = newA->singleTableAdapter->getReadsTableName();
654 QString idsTable = "tmp_mig_" + oldTable; // TODO
655
656 #ifdef _DEBUG
657 qint64 nOldReads1 = SQLiteReadQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
658 qint64 nNewReads1 = SQLiteReadQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
659 int readsMoved = migData.size();
660 int rowsPerRange = multiTableAdapter->getRowsPerRange();
661 U2Region newProwRegion(newA->rowPos * rowsPerRange, rowsPerRange);
662 #endif
663
664 perfLog.trace(QString("Assembly: running reads migration from %1 to %2 number of reads: %3").arg(oldTable).arg(newTable).arg(migData.size()));
665 quint64 t0 = GTimer::currentTimeMicros();
666
667 { // nested block is needed to ensure all queries are finalized
668
669 SQLiteWriteQuery(QString("CREATE TEMPORARY TABLE %1(id INTEGER PRIMARY KEY, prow INTEGER NOT NULL)").arg(idsTable), db, os).execute();
670 SQLiteWriteQuery insertIds(QString("INSERT INTO %1(id, prow) VALUES(?1, ?2)").arg(idsTable), db, os);
671 for (const SQLiteReadTableMigrationData &d : qAsConst(migData)) {
672 insertIds.reset(false);
673 insertIds.bindInt64(1, d.readId);
674 insertIds.bindInt32(2, d.newProw);
675 #ifdef _DEBUG
676 assert(newProwRegion.contains(d.newProw));
677 #endif
678 insertIds.execute();
679 if (os.hasError()) {
680 break;
681 }
682 }
683
684 SQLiteWriteQuery(QString("INSERT INTO %1(prow, name, gstart, elen, flags, mq, data) "
685 "SELECT %3.prow, name, gstart, elen, flags, mq, data FROM %2, %3 WHERE %2.id = %3.id")
686 .arg(newTable)
687 .arg(oldTable)
688 .arg(idsTable),
689 db,
690 os)
691 .execute();
692
693 SQLiteWriteQuery(QString("DELETE FROM %1 WHERE id IN (SELECT id FROM %2)").arg(oldTable).arg(idsTable), db, os).execute();
694 }
695 U2OpStatusImpl osStub; // using stub here -> this operation must be performed even if any of internal queries failed
696 SQLiteWriteQuery(QString("DROP TABLE IF EXISTS %1").arg(idsTable), db, osStub).execute();
697
698 qint64 nMigrated = migratedBefore + migData.size();
699 perfLog.trace(QString("Assembly: reads migration from %1 to %2 finished, time %3 seconds, progress: %4/%5 (%6%)")
700 .arg(oldTable)
701 .arg(newTable)
702 .arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000))
703 .arg(nMigrated)
704 .arg(totalMigrationCount)
705 .arg(100 * nMigrated / totalMigrationCount));
706
707 #ifdef _DEBUG
708 qint64 nOldReads2 = SQLiteReadQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
709 qint64 nNewReads2 = SQLiteReadQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
710 assert(nOldReads1 + nNewReads1 == nOldReads2 + nNewReads2);
711 assert(nNewReads1 + readsMoved == nNewReads2);
712 #endif
713 }
714 }
715
migrateAll(U2OpStatus & os)716 void MultiTablePackAlgorithmAdapter::migrateAll(U2OpStatus &os) {
717 SAFE_POINT_OP(os, );
718
719 qint64 nReadsToMigrate = 0;
720 foreach (MTASingleTableAdapter *newTable, migrations.keys()) {
721 const QVector<SQLiteReadTableMigrationData> &data = migrations[newTable];
722 nReadsToMigrate += data.size();
723 }
724 if (nReadsToMigrate == 0) {
725 return;
726 }
727 qint64 nReadsTotal = multiTableAdapter->countReads(U2_REGION_MAX, os);
728 qint64 migrationPercent = nReadsToMigrate * 100 / nReadsTotal;
729
730 perfLog.trace(QString("Assembly: starting reads migration process. Reads to migrate: %1, total: %2 (%3%)").arg(nReadsToMigrate).arg(nReadsTotal).arg(migrationPercent));
731
732 #define MAX_PERCENT_TO_REINDEX 20
733 if (migrationPercent > MAX_PERCENT_TO_REINDEX) {
734 perfLog.trace("Assembly: dropping old indexes first");
735 foreach (MTASingleTableAdapter *adapter, multiTableAdapter->getAdapters()) {
736 adapter->singleTableAdapter->dropReadsIndexes(os);
737 }
738 perfLog.trace("Assembly: indexes are dropped");
739 }
740
741 SAFE_POINT_OP(os, );
742 int nMigrated = 0;
743 foreach (MTASingleTableAdapter *newTable, migrations.keys()) {
744 const QVector<SQLiteReadTableMigrationData> &data = migrations[newTable];
745 migrate(newTable, data, nMigrated, nReadsToMigrate, os);
746 nMigrated += data.size();
747 }
748 migrations.clear();
749 }
750
ensureGridSize(int nRows)751 void MultiTablePackAlgorithmAdapter::ensureGridSize(int nRows) {
752 int oldNRows = packAdaptersGrid.size();
753 if (oldNRows < nRows) {
754 int nElens = multiTableAdapter->getNumberOfElenRanges();
755 packAdaptersGrid.resize(nRows);
756 for (int i = oldNRows; i < nRows; i++) {
757 packAdaptersGrid[i].resize(nElens);
758 }
759 }
760 }
761
762 //////////////////////////////////////////////////////////////////////////
763 // MTAReadsIterator
764
MTAReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> * > & i,const QVector<QByteArray> & ie,bool sorted)765 MTAReadsIterator::MTAReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> *> &i, const QVector<QByteArray> &ie, bool sorted)
766 : iterators(i), currentRange(0), idExtras(ie), sortedHint(sorted) {
767 }
768
~MTAReadsIterator()769 MTAReadsIterator::~MTAReadsIterator() {
770 qDeleteAll(iterators);
771 }
772
773 // TODO: remove copy-paste from this code
hasNext()774 bool MTAReadsIterator::hasNext() {
775 if (sortedHint) {
776 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
777 if (it->hasNext()) {
778 return true;
779 }
780 }
781 return false;
782 } else {
783 bool res = currentRange < iterators.size();
784 if (res) {
785 do {
786 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
787 res = it->hasNext();
788 if (res) {
789 break;
790 }
791 currentRange++;
792 } while (currentRange < iterators.size());
793 }
794 return res;
795 }
796 }
797
next()798 U2AssemblyRead MTAReadsIterator::next() {
799 U2AssemblyRead res;
800 if (sortedHint) {
801 qint64 minPos = LLONG_MAX;
802 U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
803 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
804 if (it->hasNext()) {
805 U2AssemblyRead candidate = it->peek();
806 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
807 if (candidate->leftmostPos < minPos) {
808 minIt = it;
809 minPos = candidate->leftmostPos;
810 }
811 }
812 }
813 if (nullptr != minIt) {
814 res = minIt->next();
815 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
816 int currentIt = iterators.indexOf(minIt);
817 const QByteArray &idExtra = idExtras.at(currentIt);
818 res->id = addTable2Id(res->id, idExtra);
819 }
820 return res;
821 } else {
822 if (currentRange < iterators.size()) {
823 do {
824 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
825 if (it->hasNext()) {
826 res = it->next();
827 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
828 const QByteArray &idExtra = idExtras.at(currentRange);
829 res->id = addTable2Id(res->id, idExtra);
830 break;
831 }
832 currentRange++;
833 } while (currentRange < iterators.size());
834 }
835 return res;
836 }
837 }
838
peek()839 U2AssemblyRead MTAReadsIterator::peek() {
840 U2AssemblyRead res;
841 if (sortedHint) {
842 qint64 minPos = LLONG_MAX;
843 U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
844 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
845 if (it->hasNext()) {
846 U2AssemblyRead candidate = it->peek();
847 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
848 if (candidate->leftmostPos < minPos) {
849 minIt = it;
850 minPos = candidate->leftmostPos;
851 }
852 }
853 }
854 if (nullptr != minIt) {
855 res = minIt->next();
856 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
857 int currentIt = iterators.indexOf(minIt);
858 const QByteArray &idExtra = idExtras.at(currentIt);
859 res->id = addTable2Id(res->id, idExtra);
860 }
861 return res;
862 } else {
863 if (currentRange < iterators.size()) {
864 do {
865 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
866 if (it->hasNext()) {
867 res = it->peek();
868 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
869 const QByteArray &idExtra = idExtras.at(currentRange);
870 res->id = addTable2Id(res->id, idExtra);
871 break;
872 }
873 currentRange++;
874 } while (currentRange < iterators.size());
875 }
876 return res;
877 }
878 }
879
880 //////////////////////////////////////////////////////////////////////////
881 // MTAPackAlgorithmDataIterator
882
MTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> * > & i,const QVector<QByteArray> & ie)883 MTAPackAlgorithmDataIterator::MTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> *> &i, const QVector<QByteArray> &ie)
884 : iterators(i), idExtras(ie) {
885 fetchNextData();
886 }
887
~MTAPackAlgorithmDataIterator()888 MTAPackAlgorithmDataIterator::~MTAPackAlgorithmDataIterator() {
889 qDeleteAll(iterators);
890 }
891
hasNext()892 bool MTAPackAlgorithmDataIterator::hasNext() {
893 return !nextData.readId.isEmpty();
894 }
895
next()896 PackAlgorithmData MTAPackAlgorithmDataIterator::next() {
897 PackAlgorithmData res = nextData;
898 fetchNextData();
899 return res;
900 }
901
peek()902 PackAlgorithmData MTAPackAlgorithmDataIterator::peek() {
903 return nextData;
904 }
905
fetchNextData()906 void MTAPackAlgorithmDataIterator::fetchNextData() {
907 PackAlgorithmData bestCandidate;
908 int bestRange = 0;
909 for (int i = 0; i < iterators.size(); i++) {
910 U2DbiIterator<PackAlgorithmData> *it = iterators[i];
911 if (!it->hasNext()) {
912 continue;
913 }
914 PackAlgorithmData d = it->peek();
915 if (bestCandidate.readId.isEmpty() || bestCandidate.leftmostPos > d.leftmostPos) {
916 bestCandidate = d;
917 bestRange = i;
918 }
919 }
920 nextData = bestCandidate;
921 if (!nextData.readId.isEmpty()) {
922 iterators[bestRange]->next();
923 const QByteArray &idExtra = idExtras.at(bestRange);
924 nextData.readId = addTable2Id(nextData.readId, idExtra);
925 }
926 }
927
928 } // namespace U2
929