1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include <U2Core/Timer.h>
23 #include <U2Core/U2AssemblyUtils.h>
24 #include <U2Core/U2OpStatusUtils.h>
25 #include <U2Core/U2SafePoints.h>
26 //#include <U2Core/U2SqlHelpers.h>
27
28 #include "MysqlMultiTableAssemblyAdapter.h"
29 #include "MysqlSingleTableAssemblyAdapter.h"
30 #include "mysql_dbi/MysqlAssemblyDbi.h"
31 #include "mysql_dbi/MysqlDbi.h"
32 #include "mysql_dbi/MysqlObjectDbi.h"
33 #include "util/AssemblyPackAlgorithm.h"
34
35 namespace U2 {
36
37 /****************************************************************/
38 /* Support functions */
39 /****************************************************************/
40
41 namespace {
42
toRange(const QVector<int> & startPos)43 QVector<U2Region> toRange(const QVector<int> &startPos) {
44 QVector<U2Region> res;
45
46 int prev = 0;
47 foreach (int pos, startPos) {
48 res << U2Region(prev, pos - prev);
49 prev = pos;
50 }
51
52 return res;
53 }
54
addTable2Id(const U2DataId & id,const QByteArray & idExtra)55 U2DataId addTable2Id(const U2DataId &id, const QByteArray &idExtra) {
56 SAFE_POINT(U2DbiUtils::toDbExtra(id).isEmpty(), "Extra field of the input U2DataId is not empty", U2DataId());
57 const quint64 dbiId = U2DbiUtils::toDbiId(id);
58 const U2DataId res = U2DbiUtils::toU2DataId(dbiId, U2Type::AssemblyRead, idExtra);
59 return res;
60 }
61
ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> & grid,int rowPos,int nElens)62 void ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> &grid, int rowPos, int nElens) {
63 int oldRows = grid.size();
64 if (oldRows > rowPos) {
65 return;
66 }
67
68 int newRows = rowPos + 1;
69 grid.resize(newRows);
70 for (int r = oldRows; r < newRows; r++) {
71 grid[r].resize(nElens);
72 }
73 }
74
75 } // unnamed namespace
76
77 /****************************************************************/
78 /* MysqlMtaSingleTableAdapter */
79 /****************************************************************/
80
MysqlMtaSingleTableAdapter(MysqlSingleTableAssemblyAdapter * adapter,int rowPos,int elenPos,const QByteArray & extra)81 MysqlMtaSingleTableAdapter::MysqlMtaSingleTableAdapter(MysqlSingleTableAssemblyAdapter *adapter,
82 int rowPos,
83 int elenPos,
84 const QByteArray &extra)
85 : singleTableAdapter(adapter),
86 rowPos(rowPos),
87 elenPos(elenPos),
88 idExtra(extra) {
89 }
90
91 /****************************************************************/
92 /* MysqlMultiTableAssemblyAdapter */
93 /****************************************************************/
94
MysqlMultiTableAssemblyAdapter(MysqlDbi * dbi,const U2DataId & assemblyId,const AssemblyCompressor * compressor,MysqlDbRef * db,U2OpStatus & os)95 MysqlMultiTableAssemblyAdapter::MysqlMultiTableAssemblyAdapter(MysqlDbi *dbi,
96 const U2DataId &assemblyId,
97 const AssemblyCompressor *compressor,
98 MysqlDbRef *db,
99 U2OpStatus &os)
100 : MysqlAssemblyAdapter(assemblyId, compressor, db),
101 dbi(dbi),
102 version(-1),
103 rowsPerRange(DEFAULT_ROWS_PER_TABLE) {
104 syncTables(os);
105 }
106
~MysqlMultiTableAssemblyAdapter()107 MysqlMultiTableAssemblyAdapter::~MysqlMultiTableAssemblyAdapter() {
108 clearTableAdaptersInfo();
109 }
110
countReads(const U2Region & r,U2OpStatus & os)111 qint64 MysqlMultiTableAssemblyAdapter::countReads(const U2Region &r, U2OpStatus &os) {
112 bool all = (r == U2_REGION_MAX);
113 qint64 sum = 0;
114
115 // use more sensitive algorithm for smaller regions with low amount of reads
116 // and not-very sensitive for huge regions with a lot of reads
117 int nReadsToUseNotPreciseAlgorithms = 1000 / (r.length + 1);
118 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
119 int n = a->singleTableAdapter->countReads(r, os);
120 if (n != 0 && !all && n < nReadsToUseNotPreciseAlgorithms) {
121 n = a->singleTableAdapter->countReadsPrecise(r, os);
122 }
123 CHECK_OP(os, sum);
124 sum += n;
125 }
126
127 return sum;
128 }
129
getMaxPackedRow(const U2Region & r,U2OpStatus & os)130 qint64 MysqlMultiTableAssemblyAdapter::getMaxPackedRow(const U2Region &r, U2OpStatus &os) {
131 qint64 max = 0;
132
133 // process only hi row adapters
134 int nRows = adaptersGrid.size();
135 for (int rowPos = nRows; --rowPos >= 0 && max == 0;) {
136 QVector<MysqlMtaSingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
137 for (int elenPos = 0, nElens = elenAdapters.size(); elenPos < nElens; elenPos++) {
138 MysqlMtaSingleTableAdapter *a = elenAdapters.at(elenPos);
139 if (a == nullptr) {
140 continue;
141 }
142
143 SAFE_POINT(a->rowPos == rowPos, "Incorrect row position", max);
144 qint64 n = a->singleTableAdapter->getMaxPackedRow(r, os);
145 SAFE_POINT(U2Region(rowsPerRange * rowPos, rowsPerRange).contains(n), "Invalid region", max);
146 max = qMax(max, n);
147 }
148 }
149
150 return max;
151 }
152
getMaxEndPos(U2OpStatus & os)153 qint64 MysqlMultiTableAssemblyAdapter::getMaxEndPos(U2OpStatus &os) {
154 // TODO: optimize by using gstart + maxReadLen for first n-1 tables
155 qint64 max = 0;
156
157 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
158 qint64 n = a->singleTableAdapter->getMaxEndPos(os);
159 CHECK_OP(os, max);
160 max = qMax(max, n);
161 }
162
163 return max;
164 }
165
getReads(const U2Region & r,U2OpStatus & os,bool sortedHint)166 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReads(const U2Region &r, U2OpStatus &os, bool sortedHint) {
167 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
168
169 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
170 iterators << a->singleTableAdapter->getReads(r, os, sortedHint);
171 CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
172 }
173
174 return new MysqlMtaReadsIterator(iterators, idExtras, sortedHint);
175 }
176
getReadsByRow(const U2Region & r,qint64 minRow,qint64 maxRow,U2OpStatus & os)177 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReadsByRow(const U2Region &r, qint64 minRow, qint64 maxRow, U2OpStatus &os) {
178 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
179 QVector<QByteArray> selectedIdExtras;
180 U2Region targetRowRange(minRow, maxRow - minRow);
181
182 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
183 const U2Region rowRegion(a->rowPos * rowsPerRange, rowsPerRange);
184 if (!rowRegion.intersects(targetRowRange)) {
185 continue;
186 }
187
188 iterators << a->singleTableAdapter->getReadsByRow(r, minRow, maxRow, os);
189 CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
190 selectedIdExtras << a->idExtra;
191 }
192
193 return new MysqlMtaReadsIterator(iterators, selectedIdExtras, false);
194 }
195
getReadsByName(const QByteArray & name,U2OpStatus & os)196 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReadsByName(const QByteArray &name, U2OpStatus &os) {
197 QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
198
199 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
200 iterators << a->singleTableAdapter->getReadsByName(name, os);
201 CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
202 }
203
204 return new MysqlMtaReadsIterator(iterators, idExtras, false);
205 }
206
addReads(U2DbiIterator<U2AssemblyRead> * it,U2AssemblyReadsImportInfo & ii,U2OpStatus & os)207 void MysqlMultiTableAssemblyAdapter::addReads(U2DbiIterator<U2AssemblyRead> *it, U2AssemblyReadsImportInfo &ii, U2OpStatus &os) {
208 bool empty = adaptersGrid.isEmpty();
209 if (empty) {
210 // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
211 initTables(QList<U2AssemblyRead>(), os);
212 CHECK_OP(os, );
213 }
214
215 bool packIsOn = empty;
216 qint64 prevLeftmostPos = -1;
217 PackAlgorithmContext packContext;
218
219 QVector<QVector<QList<U2AssemblyRead>>> readsGrid; // reads sorted by range
220 bool lastIteration = false;
221 qint64 readsInGrid = 0;
222
223 while (!os.isCoR()) {
224 int nElens = elenRanges.size();
225 if (it->hasNext()) {
226 U2AssemblyRead read = it->next();
227 CHECK_OP_BREAK(os);
228 int readLen = read->readSequence.length();
229 read->effectiveLen = readLen + U2AssemblyUtils::getCigarExtraLength(read->cigar);
230 int elenPos = getElenRangePosByLength(read->effectiveLen);
231
232 packIsOn = packIsOn && (read->leftmostPos >= prevLeftmostPos);
233 read->packedViewRow = packIsOn ? AssemblyPackAlgorithm::packRead(U2Region(read->leftmostPos, read->effectiveLen), packContext, os) : 0;
234 int rowPos = getRowRangePosByRow(read->packedViewRow);
235 ensureGridSize(readsGrid, rowPos, nElens);
236 readsGrid[rowPos][elenPos] << read;
237 ++ii.nReads;
238 ++readsInGrid;
239 } else {
240 lastIteration = true;
241 }
242
243 int nRows = readsGrid.size();
244 if (lastIteration || readsInGrid > N_READS_TO_FLUSH_TOTAL) {
245 for (int rowPos = 0; rowPos < nRows && !os.isCoR(); rowPos++) {
246 for (int elenPos = 0; elenPos < nElens && !os.isCoR(); elenPos++) {
247 QList<U2AssemblyRead> &rangeReads = readsGrid[rowPos][elenPos];
248 int nRangeReads = rangeReads.size();
249 if (nRangeReads == 0 || (!lastIteration && nRangeReads < N_READS_TO_FLUSH_PER_RANGE)) {
250 continue;
251 }
252
253 MysqlMtaSingleTableAdapter *adapter = getAdapterByRowAndElenRange(rowPos, elenPos, true, os);
254 U2AssemblyReadsImportInfo rangeReadsImportInfo(&ii);
255 // pass the same coverage info through all adapters to accumulate coverage
256 rangeReadsImportInfo.coverageInfo = ii.coverageInfo;
257 BufferedDbiIterator<U2AssemblyRead> rangeReadsIterator(rangeReads);
258 adapter->singleTableAdapter->addReads(&rangeReadsIterator, rangeReadsImportInfo, os);
259 ii.coverageInfo = rangeReadsImportInfo.coverageInfo;
260 readsInGrid -= rangeReads.size();
261 rangeReads.clear();
262 }
263 }
264 }
265 if (lastIteration) {
266 break;
267 }
268 }
269 createReadsIndexes(os);
270
271 if (packIsOn && !os.hasError()) {
272 ii.packStat.readsCount = ii.nReads;
273 ii.packStat.maxProw = packContext.maxProw;
274 ii.packed = true;
275 flushTables(os);
276 }
277 }
278
removeReads(const QList<U2DataId> & readIds,U2OpStatus & os)279 void MysqlMultiTableAssemblyAdapter::removeReads(const QList<U2DataId> &readIds, U2OpStatus &os) {
280 int nReads = readIds.size();
281
282 QHash<MysqlMtaSingleTableAdapter *, QList<U2DataId>> readsByAdapter;
283 for (int i = 0; i < nReads; i++) {
284 const U2DataId &readId = readIds[i];
285 int rowPos = getRowRangePosById(readId);
286 int elenPos = getElenRangePosById(readId);
287 MysqlMtaSingleTableAdapter *a = getAdapterByRowAndElenRange(rowPos, elenPos, false, os);
288
289 SAFE_POINT(a != nullptr, QString("No table adapter was found. row: %1, elen: %2").arg(rowPos).arg(elenPos), );
290
291 if (!readsByAdapter.contains(a)) {
292 readsByAdapter[a] = QList<U2DataId>();
293 }
294 readsByAdapter[a].append(readId);
295 }
296
297 foreach (MysqlMtaSingleTableAdapter *a, readsByAdapter.keys()) {
298 QList<U2DataId> &rangeReadIds = readsByAdapter[a];
299 a->singleTableAdapter->removeReads(rangeReadIds, os);
300 // TODO: remove adapters for empty tables. And tables as well
301 }
302 }
303
dropReadsTables(U2OpStatus & os)304 void MysqlMultiTableAssemblyAdapter::dropReadsTables(U2OpStatus &os) {
305 for (const QVector<MysqlMtaSingleTableAdapter *> &adaptersVector : qAsConst(adaptersGrid)) {
306 for (MysqlMtaSingleTableAdapter *adapter : qAsConst(adaptersVector)) {
307 if (adapter != nullptr) {
308 adapter->singleTableAdapter->dropReadsTables(os);
309 }
310 }
311 }
312 }
313
pack(U2AssemblyPackStat & stat,U2OpStatus & os)314 void MysqlMultiTableAssemblyAdapter::pack(U2AssemblyPackStat &stat, U2OpStatus &os) {
315 MysqlMultiTablePackAlgorithmAdapter packAdapter(this);
316
317 AssemblyPackAlgorithm::pack(packAdapter, stat, os);
318 CHECK_OP(os, );
319 packAdapter.releaseDbResources();
320
321 quint64 t0 = GTimer::currentTimeMicros();
322 packAdapter.migrateAll(os);
323 CHECK_OP(os, );
324 perfLog.trace(QString("Assembly: table migration pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
325
326 t0 = GTimer::currentTimeMicros();
327 // if new tables created during the pack algorithm -> create indexes
328 createReadsIndexes(os);
329 CHECK_OP(os, );
330 perfLog.trace(QString("Assembly: re-indexing pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
331
332 flushTables(os);
333 }
334
calculateCoverage(const U2Region & region,U2AssemblyCoverageStat & coverage,U2OpStatus & os)335 void MysqlMultiTableAssemblyAdapter::calculateCoverage(const U2Region ®ion, U2AssemblyCoverageStat &coverage, U2OpStatus &os) {
336 for (int i = 0; i < adapters.size(); ++i) {
337 MysqlMtaSingleTableAdapter *a = adapters.at(i);
338 a->singleTableAdapter->calculateCoverage(region, coverage, os);
339 CHECK_OP(os, );
340
341 os.setProgress((double(i + 1) / adapters.size()) * 100);
342 }
343 }
344
createReadsIndexes(U2OpStatus & os)345 void MysqlMultiTableAssemblyAdapter::createReadsIndexes(U2OpStatus &os) {
346 foreach (MysqlMtaSingleTableAdapter *a, adapters) {
347 a->singleTableAdapter->createReadsIndexes(os);
348 CHECK_OP(os, );
349 }
350 }
351
getElenRangePosByLength(qint64 readLength) const352 int MysqlMultiTableAssemblyAdapter::getElenRangePosByLength(qint64 readLength) const {
353 int nElenRanges = elenRanges.size();
354 for (int i = 0; i < nElenRanges; i++) {
355 const U2Region &r = elenRanges[i];
356 if (r.contains(readLength)) {
357 return i;
358 }
359 }
360
361 FAIL(QString("Read length does not fit any range: %1, number of ranges: %2").arg(readLength).arg(nElenRanges), nElenRanges - 1);
362 }
363
getElenRangePosById(const U2DataId & id) const364 int MysqlMultiTableAssemblyAdapter::getElenRangePosById(const U2DataId &id) const {
365 QByteArray extra = U2DbiUtils::toDbExtra(id);
366
367 SAFE_POINT(extra.size() == 4, QString("Illegal assembly read ID extra part. HEX: %1").arg(extra.toHex().constData()), -1);
368
369 const qint16 *data = (const qint16 *)extra.constData();
370 return int(data[1]);
371 }
372
getNumberOfElenRanges() const373 int MysqlMultiTableAssemblyAdapter::getNumberOfElenRanges() const {
374 return elenRanges.size();
375 }
376
getRowRangePosByRow(quint64 row) const377 int MysqlMultiTableAssemblyAdapter::getRowRangePosByRow(quint64 row) const {
378 return row / rowsPerRange;
379 }
380
getRowRangePosById(const U2DataId & id) const381 int MysqlMultiTableAssemblyAdapter::getRowRangePosById(const U2DataId &id) const {
382 QByteArray extra = U2DbiUtils::toDbExtra(id);
383
384 SAFE_POINT(extra.size() == 4, QString("Extra part size of assembly read ID is not correct. HEX(Extra): %1").arg(extra.toHex().constData()), -1);
385
386 const qint16 *data = (const qint16 *)extra.constData();
387 return int(data[0]);
388 }
389
getRowsPerRange() const390 int MysqlMultiTableAssemblyAdapter::getRowsPerRange() const {
391 return rowsPerRange;
392 }
393
getAdapters() const394 const QVector<MysqlMtaSingleTableAdapter *> &MysqlMultiTableAssemblyAdapter::getAdapters() const {
395 return adapters;
396 }
getIdExtrasPerRange() const397 const QVector<QByteArray> &MysqlMultiTableAssemblyAdapter::getIdExtrasPerRange() const {
398 return idExtras;
399 }
400
getDbRef() const401 MysqlDbRef *MysqlMultiTableAssemblyAdapter::getDbRef() const {
402 return dbi->getDbRef();
403 }
404
getAdapterByRowAndElenRange(int rowPos,int elenPos,bool createIfNotExits,U2OpStatus & os)405 MysqlMtaSingleTableAdapter *MysqlMultiTableAssemblyAdapter::getAdapterByRowAndElenRange(int rowPos, int elenPos, bool createIfNotExits, U2OpStatus &os) {
406 int nElens = elenRanges.size();
407 SAFE_POINT(elenPos < nElens, "Out of range", nullptr);
408 if (rowPos >= adaptersGrid.size()) {
409 SAFE_POINT(createIfNotExits, "Adapter is not exists", nullptr);
410 int oldRowSize = adaptersGrid.size();
411 int newRowSize = rowPos + 1;
412 adaptersGrid.resize(newRowSize);
413 for (int i = oldRowSize; i < newRowSize; i++) {
414 adaptersGrid[i].resize(nElens);
415 }
416 }
417
418 QVector<MysqlMtaSingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
419 SAFE_POINT(elenAdapters.size() == nElens, "Invalid adapters array", nullptr);
420 MysqlMtaSingleTableAdapter *adapter = elenAdapters.at(elenPos);
421 if (adapter == nullptr && createIfNotExits) {
422 adapter = createAdapter(rowPos, elenPos, os);
423 }
424
425 return adapter;
426 }
427
getTableSuffix(int rowPos,int elenPos)428 QString MysqlMultiTableAssemblyAdapter::getTableSuffix(int rowPos, int elenPos) {
429 SAFE_POINT(0 <= elenPos && elenPos < elenRanges.size(), "Out of range", "");
430
431 const U2Region eRegion = elenRanges[elenPos];
432 bool last = (elenPos + 1 == elenRanges.size());
433 return QString("%1_%2_%3").arg(eRegion.startPos).arg(last ? QString("U") : QString::number(eRegion.endPos())).arg(rowPos);
434 }
435
getIdExtra(int rowPos,int elenPos)436 QByteArray MysqlMultiTableAssemblyAdapter::getIdExtra(int rowPos, int elenPos) {
437 QByteArray res(4, 0);
438 qint16 *data = (qint16 *)res.data();
439 data[0] = (qint16)rowPos;
440 data[1] = (qint16)elenPos;
441 return res;
442 }
443
syncTables(U2OpStatus & os)444 void MysqlMultiTableAssemblyAdapter::syncTables(U2OpStatus &os) {
445 qint64 versionInDb = dbi->getObjectDbi()->getObjectVersion(assemblyId, os);
446 CHECK_OP(os, );
447
448 if (versionInDb <= version) {
449 return;
450 }
451
452 static const QString queryString = "SELECT idata FROM Assembly WHERE object = :object";
453 U2SqlQuery q(queryString, db, os);
454 q.bindDataId(":object", assemblyId);
455 if (q.step()) {
456 const QByteArray data = q.getBlob(0);
457 rereadTables(data, os);
458 CHECK_OP_EXT(os, version = versionInDb, );
459 }
460 }
461
initTables(const QList<U2AssemblyRead> & reads,U2OpStatus & os)462 void MysqlMultiTableAssemblyAdapter::initTables(const QList<U2AssemblyRead> &reads, U2OpStatus &os) {
463 SAFE_POINT(elenRanges.isEmpty(), "Effective ranges are already initialized", );
464
465 const int nReads = reads.size();
466 // TODO
467 if (false && nReads > 1000) {
468 // // get reads distribution first
469 // QVector<int> distribution(nReads / 10, 0);
470 // int* data = distribution.data();
471 // foreach(const U2AssemblyRead& read, reads) {
472 // int elen = read->readSequence.size() + U2AssemblyUtils::getCigarExtraLength(read->cigar);
473 // int idx = elen / 10;
474 // data[idx]++;
475 // }
476 // // derive regions
477 // // TODO:
478 } else {
479 QVector<int> starts;
480 starts << 50 << 100 << 200 << 400 << 800 << 4 * 1000 << 25 * 1000 << 100 * 1000 << 500 * 1000 << 2 * 1000 * 1000;
481 elenRanges << toRange(starts);
482 }
483
484 initAdaptersGrid(1, elenRanges.size());
485 flushTables(os);
486 }
487
rereadTables(const QByteArray & idata,U2OpStatus & os)488 void MysqlMultiTableAssemblyAdapter::rereadTables(const QByteArray &idata, U2OpStatus &os) {
489 QWriteLocker wl(&tablesSyncLock);
490
491 clearTableAdaptersInfo();
492
493 // format: N, N, N, N | N, N |.....
494 // elements are separated by | sign. First element encodes ranges, second prow step and max prow, others are for future extension
495 if (idata.isEmpty()) {
496 // assembly is empty - no index data was created
497 return;
498 }
499
500 const QList<QByteArray> elements = idata.split('|');
501 if (elements.size() < 2) {
502 os.setError(U2DbiL10n::tr("Failed to detect assembly storage format: %1").arg(idata.constData()));
503 return;
504 }
505 QByteArray elenData = elements.at(0);
506 QByteArray prowData = elements.at(1);
507
508 const QList<QByteArray> elenTokens = elenData.split(',');
509 U2Region prev(-1, 1);
510 bool parseOk = true;
511 foreach (const QByteArray &elenTok, elenTokens) {
512 int start = elenTok.toInt(&parseOk);
513 if (!parseOk || start < prev.endPos()) {
514 os.setError(U2DbiL10n::tr("Failed to parse range: %1, full: %2").arg(elenTok.constData()).arg(elenData.constData()));
515 return;
516 }
517
518 U2Region region(prev.endPos(), start - prev.endPos());
519 elenRanges << region;
520 prev = region;
521 }
522 elenRanges << U2Region(prev.endPos(), INT_MAX);
523
524 const QList<QByteArray> prowTokens = prowData.split(',');
525 int prange = prowTokens.at(0).toInt(&parseOk);
526
527 if (prange < 1 || !parseOk) {
528 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
529 return;
530 }
531
532 if (prowTokens.size() != 2) {
533 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
534 return;
535 }
536
537 int nRows = prowTokens.at(1).toInt(&parseOk);
538 if (nRows < 0 || !parseOk) {
539 os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
540 return;
541 }
542
543 // ok, all regions parsed, now create adapters
544 int nElens = elenRanges.size();
545 initAdaptersGrid(nRows, nElens);
546 for (int rowPos = 0; rowPos < nRows; rowPos++) {
547 for (int elenPos = 0; elenPos < nElens; elenPos++) {
548 const QString suffix = getTableSuffix(rowPos, elenPos);
549 const QString tableName = MysqlSingleTableAssemblyAdapter::getReadsTableName(assemblyId, 'M', suffix);
550 if (MysqlUtils::isTableExists(tableName, db, os)) {
551 createAdapter(rowPos, elenPos, os);
552 }
553 }
554 }
555 }
556
flushTables(U2OpStatus & os)557 void MysqlMultiTableAssemblyAdapter::flushTables(U2OpStatus &os) {
558 MysqlTransaction t(db, os);
559 Q_UNUSED(t);
560
561 if (adaptersGrid.isEmpty()) {
562 // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
563 initTables(QList<U2AssemblyRead>(), os);
564 CHECK_OP(os, );
565 }
566
567 QByteArray idata;
568 for (int i = 0; i < elenRanges.size(); i++) {
569 int rangeStart = elenRanges[i].startPos;
570 if (!idata.isEmpty()) {
571 idata += ',';
572 }
573 idata += QByteArray::number(rangeStart);
574 }
575 idata += '|' + QByteArray::number(rowsPerRange) + ',' + QByteArray::number(adaptersGrid.size());
576
577 static const QString queryString = "UPDATE Assembly SET idata = :idata WHERE object = :object";
578 U2SqlQuery q(queryString, db, os);
579 q.bindBlob(":idata", idata);
580 q.bindDataId(":object", assemblyId);
581 q.execute();
582 }
583
clearTableAdaptersInfo()584 void MysqlMultiTableAssemblyAdapter::clearTableAdaptersInfo() {
585 qDeleteAll(adapters);
586 adaptersGrid.clear();
587 idExtras.clear();
588 elenRanges.clear();
589 }
590
createAdapter(int rowPos,int elenPos,U2OpStatus & os)591 MysqlMtaSingleTableAdapter *MysqlMultiTableAssemblyAdapter::createAdapter(int rowPos, int elenPos, U2OpStatus &os) {
592 SAFE_POINT(0 <= rowPos && rowPos < adaptersGrid.size(), "Out of range", nullptr);
593 SAFE_POINT(0 <= elenPos && elenPos < adaptersGrid.at(rowPos).size(), "Out of range", nullptr);
594 SAFE_POINT(nullptr == adaptersGrid.at(rowPos).at(elenPos), "Adapter is already created", nullptr);
595
596 const QString suffix = getTableSuffix(rowPos, elenPos);
597 const U2Region &elenRange = elenRanges.at(elenPos);
598 const QByteArray idExtra = getIdExtra(rowPos, elenPos);
599
600 MysqlSingleTableAssemblyAdapter *sa = new MysqlSingleTableAssemblyAdapter(dbi, assemblyId, 'M', suffix, compressor, db, os);
601 sa->enableRangeTableMode(elenRange.startPos, elenRange.endPos());
602
603 MysqlMtaSingleTableAdapter *ma = new MysqlMtaSingleTableAdapter(sa, rowPos, elenPos, idExtra);
604
605 adapters << ma;
606 idExtras << idExtra;
607 adaptersGrid[rowPos][elenPos] = ma;
608
609 return ma;
610 }
611
initAdaptersGrid(int nRows,int nElens)612 void MysqlMultiTableAssemblyAdapter::initAdaptersGrid(int nRows, int nElens) {
613 SAFE_POINT(adaptersGrid.isEmpty(), "Adapters are already initialized", );
614
615 adaptersGrid.resize(nRows);
616 for (int i = 0; i < nRows; i++) {
617 adaptersGrid[i] = QVector<MysqlMtaSingleTableAdapter *>(nElens, nullptr);
618 }
619 }
620
621 //////////////////////////////////////////////////////////////////////////
622 // MysqlReadTableMigrationData
623
MysqlReadTableMigrationData()624 MysqlReadTableMigrationData::MysqlReadTableMigrationData()
625 : readId(-1),
626 oldTable(nullptr),
627 newProw(-1) {
628 }
629
MysqlReadTableMigrationData(qint64 oldId,MysqlMtaSingleTableAdapter * oldT,int newP)630 MysqlReadTableMigrationData::MysqlReadTableMigrationData(qint64 oldId, MysqlMtaSingleTableAdapter *oldT, int newP)
631 : readId(oldId),
632 oldTable(oldT),
633 newProw(newP) {
634 }
635
636 //////////////////////////////////////////////////////////////////////////
637 // pack adapter
638
MysqlMultiTablePackAlgorithmAdapter(MysqlMultiTableAssemblyAdapter * ma)639 MysqlMultiTablePackAlgorithmAdapter::MysqlMultiTablePackAlgorithmAdapter(MysqlMultiTableAssemblyAdapter *ma)
640 : multiTableAdapter(ma) {
641 MysqlDbRef *db = multiTableAdapter->getDbRef();
642 int nElens = multiTableAdapter->getNumberOfElenRanges();
643 ensureGridSize(nElens);
644
645 foreach (MysqlMtaSingleTableAdapter *a, multiTableAdapter->getAdapters()) {
646 MysqlSingleTablePackAlgorithmAdapter *sa = new MysqlSingleTablePackAlgorithmAdapter(db, a->singleTableAdapter->getReadsTableName());
647 packAdapters << sa;
648 if (packAdaptersGrid.size() <= a->rowPos) {
649 packAdaptersGrid.resize(a->rowPos + 1);
650 }
651
652 if (packAdaptersGrid[a->rowPos].size() <= a->elenPos) {
653 packAdaptersGrid[a->rowPos].resize(a->elenPos + 1);
654 }
655
656 packAdaptersGrid[a->rowPos][a->elenPos] = sa;
657 }
658 }
659
~MysqlMultiTablePackAlgorithmAdapter()660 MysqlMultiTablePackAlgorithmAdapter::~MysqlMultiTablePackAlgorithmAdapter() {
661 qDeleteAll(packAdapters);
662 }
663
selectAllReads(U2OpStatus & os)664 U2DbiIterator<PackAlgorithmData> *MysqlMultiTablePackAlgorithmAdapter::selectAllReads(U2OpStatus &os) {
665 QVector<U2DbiIterator<PackAlgorithmData> *> iterators;
666 foreach (MysqlSingleTablePackAlgorithmAdapter *a, packAdapters) {
667 iterators << a->selectAllReads(os);
668 }
669
670 return new MysqlMTAPackAlgorithmDataIterator(iterators, multiTableAdapter->getIdExtrasPerRange());
671 }
672
assignProw(const U2DataId & readId,qint64 prow,U2OpStatus & os)673 void MysqlMultiTablePackAlgorithmAdapter::assignProw(const U2DataId &readId, qint64 prow, U2OpStatus &os) {
674 int elenPos = multiTableAdapter->getElenRangePosById(readId);
675 int oldRowPos = multiTableAdapter->getRowRangePosById(readId);
676 int newRowPos = multiTableAdapter->getRowRangePosByRow(prow);
677
678 MysqlSingleTablePackAlgorithmAdapter *sa = nullptr;
679 if (newRowPos == oldRowPos) {
680 sa = packAdaptersGrid[oldRowPos][elenPos];
681 sa->assignProw(readId, prow, os);
682 return;
683 }
684 ensureGridSize(newRowPos + 1);
685
686 sa = packAdaptersGrid[newRowPos][elenPos];
687 MysqlMtaSingleTableAdapter *oldA = multiTableAdapter->getAdapterByRowAndElenRange(oldRowPos, elenPos, false, os);
688 MysqlMtaSingleTableAdapter *newA = multiTableAdapter->getAdapterByRowAndElenRange(newRowPos, elenPos, true, os);
689
690 SAFE_POINT(oldA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(oldRowPos).arg(elenPos), );
691 SAFE_POINT(newA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(newRowPos).arg(elenPos), );
692 SAFE_POINT_OP(os, );
693
694 if (sa == nullptr) {
695 sa = new MysqlSingleTablePackAlgorithmAdapter(multiTableAdapter->getDbRef(), newA->singleTableAdapter->getReadsTableName());
696 packAdapters << sa;
697 packAdaptersGrid[newRowPos][elenPos] = sa;
698 }
699
700 QVector<MysqlReadTableMigrationData> &newTableData = migrations[newA];
701 newTableData.append(MysqlReadTableMigrationData(U2DbiUtils::toDbiId(readId), oldA, prow));
702 // TODO: add mem check here!
703 }
704
releaseDbResources()705 void MysqlMultiTablePackAlgorithmAdapter::releaseDbResources() {
706 foreach (MysqlSingleTablePackAlgorithmAdapter *a, packAdapters) {
707 a->releaseDbResources();
708 }
709 }
710
migrateAll(U2OpStatus & os)711 void MysqlMultiTablePackAlgorithmAdapter::migrateAll(U2OpStatus &os) {
712 qint64 nReadsToMigrate = 0;
713 foreach (MysqlMtaSingleTableAdapter *newTable, migrations.keys()) {
714 const QVector<MysqlReadTableMigrationData> &data = migrations[newTable];
715 nReadsToMigrate += data.size();
716 }
717
718 if (nReadsToMigrate == 0) {
719 return;
720 }
721
722 qint64 nReadsTotal = multiTableAdapter->countReads(U2_REGION_MAX, os);
723 qint64 migrationPercent = nReadsToMigrate * 100 / nReadsTotal;
724
725 perfLog.trace(QString("Assembly: starting reads migration process. Reads to migrate: %1, total: %2 (%3%)").arg(nReadsToMigrate).arg(nReadsTotal).arg(migrationPercent));
726
727 if (migrationPercent > MAX_PERCENT_TO_REINDEX) {
728 perfLog.trace("Assembly: dropping old indexes first");
729 foreach (MysqlMtaSingleTableAdapter *adapter, multiTableAdapter->getAdapters()) {
730 adapter->singleTableAdapter->dropReadsIndexes(os);
731 }
732 perfLog.trace("Assembly: indexes are dropped");
733 }
734
735 SAFE_POINT_OP(os, );
736 int nMigrated = 0;
737 foreach (MysqlMtaSingleTableAdapter *newTable, migrations.keys()) {
738 const QVector<MysqlReadTableMigrationData> &data = migrations[newTable];
739 migrate(newTable, data, nMigrated, nReadsToMigrate, os);
740 nMigrated += data.size();
741 }
742 migrations.clear();
743 }
744
ensureGridSize(int nRows)745 void MysqlMultiTablePackAlgorithmAdapter::ensureGridSize(int nRows) {
746 int oldNRows = packAdaptersGrid.size();
747 if (oldNRows < nRows) {
748 int nElens = multiTableAdapter->getNumberOfElenRanges();
749 packAdaptersGrid.resize(nRows);
750 for (int i = oldNRows; i < nRows; i++) {
751 packAdaptersGrid[i].resize(nElens);
752 }
753 }
754 }
755
migrate(MysqlMtaSingleTableAdapter * newA,const QVector<MysqlReadTableMigrationData> & data,qint64 migratedBefore,qint64 totalMigrationCount,U2OpStatus & os)756 void MysqlMultiTablePackAlgorithmAdapter::migrate(MysqlMtaSingleTableAdapter *newA, const QVector<MysqlReadTableMigrationData> &data, qint64 migratedBefore, qint64 totalMigrationCount, U2OpStatus &os) {
757 // delete reads from old tables, and insert into new one
758 QHash<MysqlMtaSingleTableAdapter *, QVector<MysqlReadTableMigrationData>> readsByOldTable;
759 foreach (const MysqlReadTableMigrationData &d, data) {
760 readsByOldTable[d.oldTable].append(d);
761 }
762
763 MysqlDbRef *db = multiTableAdapter->getDbRef();
764 MysqlTransaction t(db, os);
765 Q_UNUSED(t);
766
767 foreach (MysqlMtaSingleTableAdapter *oldA, readsByOldTable.keys()) {
768 const QVector<MysqlReadTableMigrationData> &migData = readsByOldTable[oldA];
769 if (migData.isEmpty()) {
770 continue;
771 }
772
773 QString oldTable = oldA->singleTableAdapter->getReadsTableName();
774 QString newTable = newA->singleTableAdapter->getReadsTableName();
775 QString idsTable = "tmp_mig_" + oldTable; // TODO
776
777 #ifdef _DEBUG
778 qint64 nOldReads1 = U2SqlQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
779 qint64 nNewReads1 = U2SqlQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
780 int readsMoved = migData.size();
781 int rowsPerRange = multiTableAdapter->getRowsPerRange();
782 U2Region newProwRegion(newA->rowPos * rowsPerRange, rowsPerRange);
783 #endif
784
785 perfLog.trace(QString("Assembly: running reads migration from %1 to %2 number of reads: %3").arg(oldTable).arg(newTable).arg(migData.size()));
786 quint64 t0 = GTimer::currentTimeMicros();
787
788 { // nested block is needed to ensure all queries are finalized
789
790 static const QString tempTableQuery = "CREATE TEMPORARY TABLE %1(id INTEGER PRIMARY KEY, prow INTEGER NOT NULL)";
791 U2SqlQuery(tempTableQuery.arg(idsTable), db, os).execute();
792 CHECK_OP(os, );
793
794 static const QString insertQuery = "INSERT INTO %1(id, prow) VALUES(:id, :prow)";
795 for (const MysqlReadTableMigrationData &d : qAsConst(migData)) {
796 U2SqlQuery insertIds(insertQuery.arg(idsTable), db, os);
797 insertIds.bindInt64(":id", d.readId);
798 insertIds.bindInt32(":prow", d.newProw);
799 #ifdef _DEBUG
800 SAFE_POINT(newProwRegion.contains(d.newProw), "Invalid region", );
801 #endif
802 insertIds.execute();
803 CHECK_OP_BREAK(os);
804 }
805
806 if (!os.isCoR()) {
807 static const QString insertString = "INSERT INTO %1(prow, name, gstart, elen, flags, mq, data) "
808 "SELECT %3.prow, name, gstart, elen, flags, mq, data FROM %2, %3 WHERE %2.id = %3.id";
809 U2SqlQuery(insertString.arg(newTable).arg(oldTable).arg(idsTable), db, os).execute();
810
811 static const QString deleteString = "DELETE A.* FROM %1 AS A INNER JOIN %2 AS B ON A.id = B.id";
812 U2SqlQuery(deleteString.arg(oldTable).arg(idsTable), db, os).execute();
813 }
814 }
815 U2OpStatusImpl osStub; // using stub here -> this operation must be performed even if any of internal queries failed
816 static const QString dropTableString = "DROP TABLE IF EXISTS %1";
817 U2SqlQuery(dropTableString.arg(idsTable), db, osStub).execute();
818
819 qint64 nMigrated = migratedBefore + migData.size();
820 perfLog.trace(QString("Assembly: reads migration from %1 to %2 finished, time %3 seconds, progress: %4/%5 (%6%)")
821 .arg(oldTable)
822 .arg(newTable)
823 .arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000))
824 .arg(nMigrated)
825 .arg(totalMigrationCount)
826 .arg(100 * nMigrated / totalMigrationCount));
827
828 #ifdef _DEBUG
829 qint64 nOldReads2 = U2SqlQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
830 qint64 nNewReads2 = U2SqlQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
831 SAFE_POINT(nOldReads1 + nNewReads1 == nOldReads2 + nNewReads2, "Invalid reads count", );
832 SAFE_POINT(nNewReads1 + readsMoved == nNewReads2, "Invalid reads count", );
833 #endif
834 }
835 }
836
837 //////////////////////////////////////////////////////////////////////////
838 // MTAReadsIterator
839
MysqlMtaReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> * > & i,const QVector<QByteArray> & ie,bool sorted)840 MysqlMtaReadsIterator::MysqlMtaReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> *> &i, const QVector<QByteArray> &ie, bool sorted)
841 : iterators(i), currentRange(0), idExtras(ie), sortedHint(sorted) {
842 }
843
~MysqlMtaReadsIterator()844 MysqlMtaReadsIterator::~MysqlMtaReadsIterator() {
845 qDeleteAll(iterators);
846 }
847
848 // TODO: remove copy-paste from this code
hasNext()849 bool MysqlMtaReadsIterator::hasNext() {
850 if (sortedHint) {
851 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
852 if (it->hasNext()) {
853 return true;
854 }
855 }
856 return false;
857 } else {
858 bool res = currentRange < iterators.size();
859 if (res) {
860 do {
861 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
862 res = it->hasNext();
863 if (res) {
864 break;
865 }
866 currentRange++;
867 } while (currentRange < iterators.size());
868 }
869 return res;
870 }
871 }
872
next()873 U2AssemblyRead MysqlMtaReadsIterator::next() {
874 U2AssemblyRead res;
875 if (sortedHint) {
876 qint64 minPos = LLONG_MAX;
877 U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
878 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
879 if (it->hasNext()) {
880 U2AssemblyRead candidate = it->peek();
881 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
882 if (candidate->leftmostPos < minPos) {
883 minIt = it;
884 minPos = candidate->leftmostPos;
885 }
886 }
887 }
888 if (nullptr != minIt) {
889 res = minIt->next();
890 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
891 int currentIt = iterators.indexOf(minIt);
892 const QByteArray &idExtra = idExtras.at(currentIt);
893 res->id = addTable2Id(res->id, idExtra);
894 }
895 return res;
896 } else {
897 if (currentRange < iterators.size()) {
898 do {
899 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
900 if (it->hasNext()) {
901 res = it->next();
902 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
903 const QByteArray &idExtra = idExtras.at(currentRange);
904 res->id = addTable2Id(res->id, idExtra);
905 break;
906 }
907 currentRange++;
908 } while (currentRange < iterators.size());
909 }
910 return res;
911 }
912 }
913
peek()914 U2AssemblyRead MysqlMtaReadsIterator::peek() {
915 U2AssemblyRead res;
916 if (sortedHint) {
917 qint64 minPos = LLONG_MAX;
918 U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
919 foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
920 if (it->hasNext()) {
921 U2AssemblyRead candidate = it->peek();
922 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
923 if (candidate->leftmostPos < minPos) {
924 minIt = it;
925 minPos = candidate->leftmostPos;
926 }
927 }
928 }
929 if (nullptr != minIt) {
930 res = minIt->next();
931 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
932 int currentIt = iterators.indexOf(minIt);
933 const QByteArray &idExtra = idExtras.at(currentIt);
934 res->id = addTable2Id(res->id, idExtra);
935 }
936 return res;
937 } else {
938 if (currentRange < iterators.size()) {
939 do {
940 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
941 if (it->hasNext()) {
942 res = it->peek();
943 SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
944 const QByteArray &idExtra = idExtras.at(currentRange);
945 res->id = addTable2Id(res->id, idExtra);
946 break;
947 }
948 currentRange++;
949 } while (currentRange < iterators.size());
950 }
951 return res;
952 }
953 }
954
955 //////////////////////////////////////////////////////////////////////////
956 // MTAPackAlgorithmDataIterator
957
MysqlMTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> * > & i,const QVector<QByteArray> & ie)958 MysqlMTAPackAlgorithmDataIterator::MysqlMTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> *> &i, const QVector<QByteArray> &ie)
959 : iterators(i), idExtras(ie) {
960 fetchNextData();
961 }
962
~MysqlMTAPackAlgorithmDataIterator()963 MysqlMTAPackAlgorithmDataIterator::~MysqlMTAPackAlgorithmDataIterator() {
964 qDeleteAll(iterators);
965 }
966
hasNext()967 bool MysqlMTAPackAlgorithmDataIterator::hasNext() {
968 return !nextData.readId.isEmpty();
969 }
970
next()971 PackAlgorithmData MysqlMTAPackAlgorithmDataIterator::next() {
972 PackAlgorithmData res = nextData;
973 fetchNextData();
974 return res;
975 }
976
peek()977 PackAlgorithmData MysqlMTAPackAlgorithmDataIterator::peek() {
978 return nextData;
979 }
980
fetchNextData()981 void MysqlMTAPackAlgorithmDataIterator::fetchNextData() {
982 PackAlgorithmData bestCandidate;
983 int bestRange = 0;
984 for (int i = 0; i < iterators.size(); i++) {
985 U2DbiIterator<PackAlgorithmData> *it = iterators[i];
986 if (!it->hasNext()) {
987 continue;
988 }
989 PackAlgorithmData d = it->peek();
990 if (bestCandidate.readId.isEmpty() || bestCandidate.leftmostPos > d.leftmostPos) {
991 bestCandidate = d;
992 bestRange = i;
993 }
994 }
995 nextData = bestCandidate;
996 if (!nextData.readId.isEmpty()) {
997 iterators[bestRange]->next();
998 const QByteArray &idExtra = idExtras.at(bestRange);
999 nextData.readId = addTable2Id(nextData.readId, idExtra);
1000 }
1001 }
1002
1003 } // namespace U2
1004