1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include <U2Core/Timer.h>
23 #include <U2Core/U2AssemblyUtils.h>
24 #include <U2Core/U2OpStatusUtils.h>
25 #include <U2Core/U2SafePoints.h>
26 //#include <U2Core/U2SqlHelpers.h>
27 
28 #include "MysqlMultiTableAssemblyAdapter.h"
29 #include "MysqlSingleTableAssemblyAdapter.h"
30 #include "mysql_dbi/MysqlAssemblyDbi.h"
31 #include "mysql_dbi/MysqlDbi.h"
32 #include "mysql_dbi/MysqlObjectDbi.h"
33 #include "util/AssemblyPackAlgorithm.h"
34 
35 namespace U2 {
36 
37 /****************************************************************/
38 /* Support functions */
39 /****************************************************************/
40 
41 namespace {
42 
toRange(const QVector<int> & startPos)43 QVector<U2Region> toRange(const QVector<int> &startPos) {
44     QVector<U2Region> res;
45 
46     int prev = 0;
47     foreach (int pos, startPos) {
48         res << U2Region(prev, pos - prev);
49         prev = pos;
50     }
51 
52     return res;
53 }
54 
addTable2Id(const U2DataId & id,const QByteArray & idExtra)55 U2DataId addTable2Id(const U2DataId &id, const QByteArray &idExtra) {
56     SAFE_POINT(U2DbiUtils::toDbExtra(id).isEmpty(), "Extra field of the input U2DataId is not empty", U2DataId());
57     const quint64 dbiId = U2DbiUtils::toDbiId(id);
58     const U2DataId res = U2DbiUtils::toU2DataId(dbiId, U2Type::AssemblyRead, idExtra);
59     return res;
60 }
61 
ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> & grid,int rowPos,int nElens)62 void ensureGridSize(QVector<QVector<QList<U2AssemblyRead>>> &grid, int rowPos, int nElens) {
63     int oldRows = grid.size();
64     if (oldRows > rowPos) {
65         return;
66     }
67 
68     int newRows = rowPos + 1;
69     grid.resize(newRows);
70     for (int r = oldRows; r < newRows; r++) {
71         grid[r].resize(nElens);
72     }
73 }
74 
75 }  // unnamed namespace
76 
77 /****************************************************************/
78 /* MysqlMtaSingleTableAdapter */
79 /****************************************************************/
80 
MysqlMtaSingleTableAdapter(MysqlSingleTableAssemblyAdapter * adapter,int rowPos,int elenPos,const QByteArray & extra)81 MysqlMtaSingleTableAdapter::MysqlMtaSingleTableAdapter(MysqlSingleTableAssemblyAdapter *adapter,
82                                                        int rowPos,
83                                                        int elenPos,
84                                                        const QByteArray &extra)
85     : singleTableAdapter(adapter),
86       rowPos(rowPos),
87       elenPos(elenPos),
88       idExtra(extra) {
89 }
90 
91 /****************************************************************/
92 /* MysqlMultiTableAssemblyAdapter */
93 /****************************************************************/
94 
MysqlMultiTableAssemblyAdapter(MysqlDbi * dbi,const U2DataId & assemblyId,const AssemblyCompressor * compressor,MysqlDbRef * db,U2OpStatus & os)95 MysqlMultiTableAssemblyAdapter::MysqlMultiTableAssemblyAdapter(MysqlDbi *dbi,
96                                                                const U2DataId &assemblyId,
97                                                                const AssemblyCompressor *compressor,
98                                                                MysqlDbRef *db,
99                                                                U2OpStatus &os)
100     : MysqlAssemblyAdapter(assemblyId, compressor, db),
101       dbi(dbi),
102       version(-1),
103       rowsPerRange(DEFAULT_ROWS_PER_TABLE) {
104     syncTables(os);
105 }
106 
~MysqlMultiTableAssemblyAdapter()107 MysqlMultiTableAssemblyAdapter::~MysqlMultiTableAssemblyAdapter() {
108     clearTableAdaptersInfo();
109 }
110 
countReads(const U2Region & r,U2OpStatus & os)111 qint64 MysqlMultiTableAssemblyAdapter::countReads(const U2Region &r, U2OpStatus &os) {
112     bool all = (r == U2_REGION_MAX);
113     qint64 sum = 0;
114 
115     // use more sensitive algorithm for smaller regions with low amount of reads
116     // and not-very sensitive for huge regions with a lot of reads
117     int nReadsToUseNotPreciseAlgorithms = 1000 / (r.length + 1);
118     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
119         int n = a->singleTableAdapter->countReads(r, os);
120         if (n != 0 && !all && n < nReadsToUseNotPreciseAlgorithms) {
121             n = a->singleTableAdapter->countReadsPrecise(r, os);
122         }
123         CHECK_OP(os, sum);
124         sum += n;
125     }
126 
127     return sum;
128 }
129 
getMaxPackedRow(const U2Region & r,U2OpStatus & os)130 qint64 MysqlMultiTableAssemblyAdapter::getMaxPackedRow(const U2Region &r, U2OpStatus &os) {
131     qint64 max = 0;
132 
133     // process only hi row adapters
134     int nRows = adaptersGrid.size();
135     for (int rowPos = nRows; --rowPos >= 0 && max == 0;) {
136         QVector<MysqlMtaSingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
137         for (int elenPos = 0, nElens = elenAdapters.size(); elenPos < nElens; elenPos++) {
138             MysqlMtaSingleTableAdapter *a = elenAdapters.at(elenPos);
139             if (a == nullptr) {
140                 continue;
141             }
142 
143             SAFE_POINT(a->rowPos == rowPos, "Incorrect row position", max);
144             qint64 n = a->singleTableAdapter->getMaxPackedRow(r, os);
145             SAFE_POINT(U2Region(rowsPerRange * rowPos, rowsPerRange).contains(n), "Invalid region", max);
146             max = qMax(max, n);
147         }
148     }
149 
150     return max;
151 }
152 
getMaxEndPos(U2OpStatus & os)153 qint64 MysqlMultiTableAssemblyAdapter::getMaxEndPos(U2OpStatus &os) {
154     // TODO: optimize by using gstart + maxReadLen for first n-1 tables
155     qint64 max = 0;
156 
157     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
158         qint64 n = a->singleTableAdapter->getMaxEndPos(os);
159         CHECK_OP(os, max);
160         max = qMax(max, n);
161     }
162 
163     return max;
164 }
165 
getReads(const U2Region & r,U2OpStatus & os,bool sortedHint)166 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReads(const U2Region &r, U2OpStatus &os, bool sortedHint) {
167     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
168 
169     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
170         iterators << a->singleTableAdapter->getReads(r, os, sortedHint);
171         CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
172     }
173 
174     return new MysqlMtaReadsIterator(iterators, idExtras, sortedHint);
175 }
176 
getReadsByRow(const U2Region & r,qint64 minRow,qint64 maxRow,U2OpStatus & os)177 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReadsByRow(const U2Region &r, qint64 minRow, qint64 maxRow, U2OpStatus &os) {
178     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
179     QVector<QByteArray> selectedIdExtras;
180     U2Region targetRowRange(minRow, maxRow - minRow);
181 
182     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
183         const U2Region rowRegion(a->rowPos * rowsPerRange, rowsPerRange);
184         if (!rowRegion.intersects(targetRowRange)) {
185             continue;
186         }
187 
188         iterators << a->singleTableAdapter->getReadsByRow(r, minRow, maxRow, os);
189         CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
190         selectedIdExtras << a->idExtra;
191     }
192 
193     return new MysqlMtaReadsIterator(iterators, selectedIdExtras, false);
194 }
195 
getReadsByName(const QByteArray & name,U2OpStatus & os)196 U2DbiIterator<U2AssemblyRead> *MysqlMultiTableAssemblyAdapter::getReadsByName(const QByteArray &name, U2OpStatus &os) {
197     QVector<U2DbiIterator<U2AssemblyRead> *> iterators;
198 
199     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
200         iterators << a->singleTableAdapter->getReadsByName(name, os);
201         CHECK_OP_EXT(os, qDeleteAll(iterators), nullptr);
202     }
203 
204     return new MysqlMtaReadsIterator(iterators, idExtras, false);
205 }
206 
addReads(U2DbiIterator<U2AssemblyRead> * it,U2AssemblyReadsImportInfo & ii,U2OpStatus & os)207 void MysqlMultiTableAssemblyAdapter::addReads(U2DbiIterator<U2AssemblyRead> *it, U2AssemblyReadsImportInfo &ii, U2OpStatus &os) {
208     bool empty = adaptersGrid.isEmpty();
209     if (empty) {
210         // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
211         initTables(QList<U2AssemblyRead>(), os);
212         CHECK_OP(os, );
213     }
214 
215     bool packIsOn = empty;
216     qint64 prevLeftmostPos = -1;
217     PackAlgorithmContext packContext;
218 
219     QVector<QVector<QList<U2AssemblyRead>>> readsGrid;  // reads sorted by range
220     bool lastIteration = false;
221     qint64 readsInGrid = 0;
222 
223     while (!os.isCoR()) {
224         int nElens = elenRanges.size();
225         if (it->hasNext()) {
226             U2AssemblyRead read = it->next();
227             CHECK_OP_BREAK(os);
228             int readLen = read->readSequence.length();
229             read->effectiveLen = readLen + U2AssemblyUtils::getCigarExtraLength(read->cigar);
230             int elenPos = getElenRangePosByLength(read->effectiveLen);
231 
232             packIsOn = packIsOn && (read->leftmostPos >= prevLeftmostPos);
233             read->packedViewRow = packIsOn ? AssemblyPackAlgorithm::packRead(U2Region(read->leftmostPos, read->effectiveLen), packContext, os) : 0;
234             int rowPos = getRowRangePosByRow(read->packedViewRow);
235             ensureGridSize(readsGrid, rowPos, nElens);
236             readsGrid[rowPos][elenPos] << read;
237             ++ii.nReads;
238             ++readsInGrid;
239         } else {
240             lastIteration = true;
241         }
242 
243         int nRows = readsGrid.size();
244         if (lastIteration || readsInGrid > N_READS_TO_FLUSH_TOTAL) {
245             for (int rowPos = 0; rowPos < nRows && !os.isCoR(); rowPos++) {
246                 for (int elenPos = 0; elenPos < nElens && !os.isCoR(); elenPos++) {
247                     QList<U2AssemblyRead> &rangeReads = readsGrid[rowPos][elenPos];
248                     int nRangeReads = rangeReads.size();
249                     if (nRangeReads == 0 || (!lastIteration && nRangeReads < N_READS_TO_FLUSH_PER_RANGE)) {
250                         continue;
251                     }
252 
253                     MysqlMtaSingleTableAdapter *adapter = getAdapterByRowAndElenRange(rowPos, elenPos, true, os);
254                     U2AssemblyReadsImportInfo rangeReadsImportInfo(&ii);
255                     // pass the same coverage info through all adapters to accumulate coverage
256                     rangeReadsImportInfo.coverageInfo = ii.coverageInfo;
257                     BufferedDbiIterator<U2AssemblyRead> rangeReadsIterator(rangeReads);
258                     adapter->singleTableAdapter->addReads(&rangeReadsIterator, rangeReadsImportInfo, os);
259                     ii.coverageInfo = rangeReadsImportInfo.coverageInfo;
260                     readsInGrid -= rangeReads.size();
261                     rangeReads.clear();
262                 }
263             }
264         }
265         if (lastIteration) {
266             break;
267         }
268     }
269     createReadsIndexes(os);
270 
271     if (packIsOn && !os.hasError()) {
272         ii.packStat.readsCount = ii.nReads;
273         ii.packStat.maxProw = packContext.maxProw;
274         ii.packed = true;
275         flushTables(os);
276     }
277 }
278 
removeReads(const QList<U2DataId> & readIds,U2OpStatus & os)279 void MysqlMultiTableAssemblyAdapter::removeReads(const QList<U2DataId> &readIds, U2OpStatus &os) {
280     int nReads = readIds.size();
281 
282     QHash<MysqlMtaSingleTableAdapter *, QList<U2DataId>> readsByAdapter;
283     for (int i = 0; i < nReads; i++) {
284         const U2DataId &readId = readIds[i];
285         int rowPos = getRowRangePosById(readId);
286         int elenPos = getElenRangePosById(readId);
287         MysqlMtaSingleTableAdapter *a = getAdapterByRowAndElenRange(rowPos, elenPos, false, os);
288 
289         SAFE_POINT(a != nullptr, QString("No table adapter was found. row: %1, elen: %2").arg(rowPos).arg(elenPos), );
290 
291         if (!readsByAdapter.contains(a)) {
292             readsByAdapter[a] = QList<U2DataId>();
293         }
294         readsByAdapter[a].append(readId);
295     }
296 
297     foreach (MysqlMtaSingleTableAdapter *a, readsByAdapter.keys()) {
298         QList<U2DataId> &rangeReadIds = readsByAdapter[a];
299         a->singleTableAdapter->removeReads(rangeReadIds, os);
300         // TODO: remove adapters for empty tables. And tables as well
301     }
302 }
303 
dropReadsTables(U2OpStatus & os)304 void MysqlMultiTableAssemblyAdapter::dropReadsTables(U2OpStatus &os) {
305     for (const QVector<MysqlMtaSingleTableAdapter *> &adaptersVector : qAsConst(adaptersGrid)) {
306         for (MysqlMtaSingleTableAdapter *adapter : qAsConst(adaptersVector)) {
307             if (adapter != nullptr) {
308                 adapter->singleTableAdapter->dropReadsTables(os);
309             }
310         }
311     }
312 }
313 
pack(U2AssemblyPackStat & stat,U2OpStatus & os)314 void MysqlMultiTableAssemblyAdapter::pack(U2AssemblyPackStat &stat, U2OpStatus &os) {
315     MysqlMultiTablePackAlgorithmAdapter packAdapter(this);
316 
317     AssemblyPackAlgorithm::pack(packAdapter, stat, os);
318     CHECK_OP(os, );
319     packAdapter.releaseDbResources();
320 
321     quint64 t0 = GTimer::currentTimeMicros();
322     packAdapter.migrateAll(os);
323     CHECK_OP(os, );
324     perfLog.trace(QString("Assembly: table migration pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
325 
326     t0 = GTimer::currentTimeMicros();
327     // if new tables created during the pack algorithm -> create indexes
328     createReadsIndexes(os);
329     CHECK_OP(os, );
330     perfLog.trace(QString("Assembly: re-indexing pack time: %1 seconds").arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000)));
331 
332     flushTables(os);
333 }
334 
calculateCoverage(const U2Region & region,U2AssemblyCoverageStat & coverage,U2OpStatus & os)335 void MysqlMultiTableAssemblyAdapter::calculateCoverage(const U2Region &region, U2AssemblyCoverageStat &coverage, U2OpStatus &os) {
336     for (int i = 0; i < adapters.size(); ++i) {
337         MysqlMtaSingleTableAdapter *a = adapters.at(i);
338         a->singleTableAdapter->calculateCoverage(region, coverage, os);
339         CHECK_OP(os, );
340 
341         os.setProgress((double(i + 1) / adapters.size()) * 100);
342     }
343 }
344 
createReadsIndexes(U2OpStatus & os)345 void MysqlMultiTableAssemblyAdapter::createReadsIndexes(U2OpStatus &os) {
346     foreach (MysqlMtaSingleTableAdapter *a, adapters) {
347         a->singleTableAdapter->createReadsIndexes(os);
348         CHECK_OP(os, );
349     }
350 }
351 
getElenRangePosByLength(qint64 readLength) const352 int MysqlMultiTableAssemblyAdapter::getElenRangePosByLength(qint64 readLength) const {
353     int nElenRanges = elenRanges.size();
354     for (int i = 0; i < nElenRanges; i++) {
355         const U2Region &r = elenRanges[i];
356         if (r.contains(readLength)) {
357             return i;
358         }
359     }
360 
361     FAIL(QString("Read length does not fit any range: %1, number of ranges: %2").arg(readLength).arg(nElenRanges), nElenRanges - 1);
362 }
363 
getElenRangePosById(const U2DataId & id) const364 int MysqlMultiTableAssemblyAdapter::getElenRangePosById(const U2DataId &id) const {
365     QByteArray extra = U2DbiUtils::toDbExtra(id);
366 
367     SAFE_POINT(extra.size() == 4, QString("Illegal assembly read ID extra part. HEX: %1").arg(extra.toHex().constData()), -1);
368 
369     const qint16 *data = (const qint16 *)extra.constData();
370     return int(data[1]);
371 }
372 
getNumberOfElenRanges() const373 int MysqlMultiTableAssemblyAdapter::getNumberOfElenRanges() const {
374     return elenRanges.size();
375 }
376 
getRowRangePosByRow(quint64 row) const377 int MysqlMultiTableAssemblyAdapter::getRowRangePosByRow(quint64 row) const {
378     return row / rowsPerRange;
379 }
380 
getRowRangePosById(const U2DataId & id) const381 int MysqlMultiTableAssemblyAdapter::getRowRangePosById(const U2DataId &id) const {
382     QByteArray extra = U2DbiUtils::toDbExtra(id);
383 
384     SAFE_POINT(extra.size() == 4, QString("Extra part size of assembly read ID is not correct. HEX(Extra): %1").arg(extra.toHex().constData()), -1);
385 
386     const qint16 *data = (const qint16 *)extra.constData();
387     return int(data[0]);
388 }
389 
getRowsPerRange() const390 int MysqlMultiTableAssemblyAdapter::getRowsPerRange() const {
391     return rowsPerRange;
392 }
393 
getAdapters() const394 const QVector<MysqlMtaSingleTableAdapter *> &MysqlMultiTableAssemblyAdapter::getAdapters() const {
395     return adapters;
396 }
getIdExtrasPerRange() const397 const QVector<QByteArray> &MysqlMultiTableAssemblyAdapter::getIdExtrasPerRange() const {
398     return idExtras;
399 }
400 
getDbRef() const401 MysqlDbRef *MysqlMultiTableAssemblyAdapter::getDbRef() const {
402     return dbi->getDbRef();
403 }
404 
getAdapterByRowAndElenRange(int rowPos,int elenPos,bool createIfNotExits,U2OpStatus & os)405 MysqlMtaSingleTableAdapter *MysqlMultiTableAssemblyAdapter::getAdapterByRowAndElenRange(int rowPos, int elenPos, bool createIfNotExits, U2OpStatus &os) {
406     int nElens = elenRanges.size();
407     SAFE_POINT(elenPos < nElens, "Out of range", nullptr);
408     if (rowPos >= adaptersGrid.size()) {
409         SAFE_POINT(createIfNotExits, "Adapter is not exists", nullptr);
410         int oldRowSize = adaptersGrid.size();
411         int newRowSize = rowPos + 1;
412         adaptersGrid.resize(newRowSize);
413         for (int i = oldRowSize; i < newRowSize; i++) {
414             adaptersGrid[i].resize(nElens);
415         }
416     }
417 
418     QVector<MysqlMtaSingleTableAdapter *> elenAdapters = adaptersGrid.at(rowPos);
419     SAFE_POINT(elenAdapters.size() == nElens, "Invalid adapters array", nullptr);
420     MysqlMtaSingleTableAdapter *adapter = elenAdapters.at(elenPos);
421     if (adapter == nullptr && createIfNotExits) {
422         adapter = createAdapter(rowPos, elenPos, os);
423     }
424 
425     return adapter;
426 }
427 
getTableSuffix(int rowPos,int elenPos)428 QString MysqlMultiTableAssemblyAdapter::getTableSuffix(int rowPos, int elenPos) {
429     SAFE_POINT(0 <= elenPos && elenPos < elenRanges.size(), "Out of range", "");
430 
431     const U2Region eRegion = elenRanges[elenPos];
432     bool last = (elenPos + 1 == elenRanges.size());
433     return QString("%1_%2_%3").arg(eRegion.startPos).arg(last ? QString("U") : QString::number(eRegion.endPos())).arg(rowPos);
434 }
435 
getIdExtra(int rowPos,int elenPos)436 QByteArray MysqlMultiTableAssemblyAdapter::getIdExtra(int rowPos, int elenPos) {
437     QByteArray res(4, 0);
438     qint16 *data = (qint16 *)res.data();
439     data[0] = (qint16)rowPos;
440     data[1] = (qint16)elenPos;
441     return res;
442 }
443 
syncTables(U2OpStatus & os)444 void MysqlMultiTableAssemblyAdapter::syncTables(U2OpStatus &os) {
445     qint64 versionInDb = dbi->getObjectDbi()->getObjectVersion(assemblyId, os);
446     CHECK_OP(os, );
447 
448     if (versionInDb <= version) {
449         return;
450     }
451 
452     static const QString queryString = "SELECT idata FROM Assembly WHERE object = :object";
453     U2SqlQuery q(queryString, db, os);
454     q.bindDataId(":object", assemblyId);
455     if (q.step()) {
456         const QByteArray data = q.getBlob(0);
457         rereadTables(data, os);
458         CHECK_OP_EXT(os, version = versionInDb, );
459     }
460 }
461 
initTables(const QList<U2AssemblyRead> & reads,U2OpStatus & os)462 void MysqlMultiTableAssemblyAdapter::initTables(const QList<U2AssemblyRead> &reads, U2OpStatus &os) {
463     SAFE_POINT(elenRanges.isEmpty(), "Effective ranges are already initialized", );
464 
465     const int nReads = reads.size();
466     // TODO
467     if (false && nReads > 1000) {
468         //        // get reads distribution first
469         //        QVector<int> distribution(nReads / 10, 0);
470         //        int* data = distribution.data();
471         //        foreach(const U2AssemblyRead& read, reads) {
472         //            int elen = read->readSequence.size() + U2AssemblyUtils::getCigarExtraLength(read->cigar);
473         //            int idx = elen / 10;
474         //            data[idx]++;
475         //        }
476         //        // derive regions
477         //        // TODO:
478     } else {
479         QVector<int> starts;
480         starts << 50 << 100 << 200 << 400 << 800 << 4 * 1000 << 25 * 1000 << 100 * 1000 << 500 * 1000 << 2 * 1000 * 1000;
481         elenRanges << toRange(starts);
482     }
483 
484     initAdaptersGrid(1, elenRanges.size());
485     flushTables(os);
486 }
487 
rereadTables(const QByteArray & idata,U2OpStatus & os)488 void MysqlMultiTableAssemblyAdapter::rereadTables(const QByteArray &idata, U2OpStatus &os) {
489     QWriteLocker wl(&tablesSyncLock);
490 
491     clearTableAdaptersInfo();
492 
493     // format: N, N, N, N | N, N |.....
494     // elements are separated by | sign. First element encodes ranges, second prow step and max prow, others are for future extension
495     if (idata.isEmpty()) {
496         // assembly is empty - no index data was created
497         return;
498     }
499 
500     const QList<QByteArray> elements = idata.split('|');
501     if (elements.size() < 2) {
502         os.setError(U2DbiL10n::tr("Failed to detect assembly storage format: %1").arg(idata.constData()));
503         return;
504     }
505     QByteArray elenData = elements.at(0);
506     QByteArray prowData = elements.at(1);
507 
508     const QList<QByteArray> elenTokens = elenData.split(',');
509     U2Region prev(-1, 1);
510     bool parseOk = true;
511     foreach (const QByteArray &elenTok, elenTokens) {
512         int start = elenTok.toInt(&parseOk);
513         if (!parseOk || start < prev.endPos()) {
514             os.setError(U2DbiL10n::tr("Failed to parse range: %1, full: %2").arg(elenTok.constData()).arg(elenData.constData()));
515             return;
516         }
517 
518         U2Region region(prev.endPos(), start - prev.endPos());
519         elenRanges << region;
520         prev = region;
521     }
522     elenRanges << U2Region(prev.endPos(), INT_MAX);
523 
524     const QList<QByteArray> prowTokens = prowData.split(',');
525     int prange = prowTokens.at(0).toInt(&parseOk);
526 
527     if (prange < 1 || !parseOk) {
528         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
529         return;
530     }
531 
532     if (prowTokens.size() != 2) {
533         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
534         return;
535     }
536 
537     int nRows = prowTokens.at(1).toInt(&parseOk);
538     if (nRows < 0 || !parseOk) {
539         os.setError(U2DbiL10n::tr("Failed to parse packed row range info %1").arg(idata.constData()));
540         return;
541     }
542 
543     // ok, all regions parsed, now create adapters
544     int nElens = elenRanges.size();
545     initAdaptersGrid(nRows, nElens);
546     for (int rowPos = 0; rowPos < nRows; rowPos++) {
547         for (int elenPos = 0; elenPos < nElens; elenPos++) {
548             const QString suffix = getTableSuffix(rowPos, elenPos);
549             const QString tableName = MysqlSingleTableAssemblyAdapter::getReadsTableName(assemblyId, 'M', suffix);
550             if (MysqlUtils::isTableExists(tableName, db, os)) {
551                 createAdapter(rowPos, elenPos, os);
552             }
553         }
554     }
555 }
556 
flushTables(U2OpStatus & os)557 void MysqlMultiTableAssemblyAdapter::flushTables(U2OpStatus &os) {
558     MysqlTransaction t(db, os);
559     Q_UNUSED(t);
560 
561     if (adaptersGrid.isEmpty()) {
562         // TODO: fetch some reads for analysis. By now not needed, since regions are hard-coded anyway
563         initTables(QList<U2AssemblyRead>(), os);
564         CHECK_OP(os, );
565     }
566 
567     QByteArray idata;
568     for (int i = 0; i < elenRanges.size(); i++) {
569         int rangeStart = elenRanges[i].startPos;
570         if (!idata.isEmpty()) {
571             idata += ',';
572         }
573         idata += QByteArray::number(rangeStart);
574     }
575     idata += '|' + QByteArray::number(rowsPerRange) + ',' + QByteArray::number(adaptersGrid.size());
576 
577     static const QString queryString = "UPDATE Assembly SET idata = :idata WHERE object = :object";
578     U2SqlQuery q(queryString, db, os);
579     q.bindBlob(":idata", idata);
580     q.bindDataId(":object", assemblyId);
581     q.execute();
582 }
583 
clearTableAdaptersInfo()584 void MysqlMultiTableAssemblyAdapter::clearTableAdaptersInfo() {
585     qDeleteAll(adapters);
586     adaptersGrid.clear();
587     idExtras.clear();
588     elenRanges.clear();
589 }
590 
createAdapter(int rowPos,int elenPos,U2OpStatus & os)591 MysqlMtaSingleTableAdapter *MysqlMultiTableAssemblyAdapter::createAdapter(int rowPos, int elenPos, U2OpStatus &os) {
592     SAFE_POINT(0 <= rowPos && rowPos < adaptersGrid.size(), "Out of range", nullptr);
593     SAFE_POINT(0 <= elenPos && elenPos < adaptersGrid.at(rowPos).size(), "Out of range", nullptr);
594     SAFE_POINT(nullptr == adaptersGrid.at(rowPos).at(elenPos), "Adapter is already created", nullptr);
595 
596     const QString suffix = getTableSuffix(rowPos, elenPos);
597     const U2Region &elenRange = elenRanges.at(elenPos);
598     const QByteArray idExtra = getIdExtra(rowPos, elenPos);
599 
600     MysqlSingleTableAssemblyAdapter *sa = new MysqlSingleTableAssemblyAdapter(dbi, assemblyId, 'M', suffix, compressor, db, os);
601     sa->enableRangeTableMode(elenRange.startPos, elenRange.endPos());
602 
603     MysqlMtaSingleTableAdapter *ma = new MysqlMtaSingleTableAdapter(sa, rowPos, elenPos, idExtra);
604 
605     adapters << ma;
606     idExtras << idExtra;
607     adaptersGrid[rowPos][elenPos] = ma;
608 
609     return ma;
610 }
611 
initAdaptersGrid(int nRows,int nElens)612 void MysqlMultiTableAssemblyAdapter::initAdaptersGrid(int nRows, int nElens) {
613     SAFE_POINT(adaptersGrid.isEmpty(), "Adapters are already initialized", );
614 
615     adaptersGrid.resize(nRows);
616     for (int i = 0; i < nRows; i++) {
617         adaptersGrid[i] = QVector<MysqlMtaSingleTableAdapter *>(nElens, nullptr);
618     }
619 }
620 
621 //////////////////////////////////////////////////////////////////////////
622 // MysqlReadTableMigrationData
623 
MysqlReadTableMigrationData()624 MysqlReadTableMigrationData::MysqlReadTableMigrationData()
625     : readId(-1),
626       oldTable(nullptr),
627       newProw(-1) {
628 }
629 
MysqlReadTableMigrationData(qint64 oldId,MysqlMtaSingleTableAdapter * oldT,int newP)630 MysqlReadTableMigrationData::MysqlReadTableMigrationData(qint64 oldId, MysqlMtaSingleTableAdapter *oldT, int newP)
631     : readId(oldId),
632       oldTable(oldT),
633       newProw(newP) {
634 }
635 
636 //////////////////////////////////////////////////////////////////////////
637 // pack adapter
638 
MysqlMultiTablePackAlgorithmAdapter(MysqlMultiTableAssemblyAdapter * ma)639 MysqlMultiTablePackAlgorithmAdapter::MysqlMultiTablePackAlgorithmAdapter(MysqlMultiTableAssemblyAdapter *ma)
640     : multiTableAdapter(ma) {
641     MysqlDbRef *db = multiTableAdapter->getDbRef();
642     int nElens = multiTableAdapter->getNumberOfElenRanges();
643     ensureGridSize(nElens);
644 
645     foreach (MysqlMtaSingleTableAdapter *a, multiTableAdapter->getAdapters()) {
646         MysqlSingleTablePackAlgorithmAdapter *sa = new MysqlSingleTablePackAlgorithmAdapter(db, a->singleTableAdapter->getReadsTableName());
647         packAdapters << sa;
648         if (packAdaptersGrid.size() <= a->rowPos) {
649             packAdaptersGrid.resize(a->rowPos + 1);
650         }
651 
652         if (packAdaptersGrid[a->rowPos].size() <= a->elenPos) {
653             packAdaptersGrid[a->rowPos].resize(a->elenPos + 1);
654         }
655 
656         packAdaptersGrid[a->rowPos][a->elenPos] = sa;
657     }
658 }
659 
~MysqlMultiTablePackAlgorithmAdapter()660 MysqlMultiTablePackAlgorithmAdapter::~MysqlMultiTablePackAlgorithmAdapter() {
661     qDeleteAll(packAdapters);
662 }
663 
selectAllReads(U2OpStatus & os)664 U2DbiIterator<PackAlgorithmData> *MysqlMultiTablePackAlgorithmAdapter::selectAllReads(U2OpStatus &os) {
665     QVector<U2DbiIterator<PackAlgorithmData> *> iterators;
666     foreach (MysqlSingleTablePackAlgorithmAdapter *a, packAdapters) {
667         iterators << a->selectAllReads(os);
668     }
669 
670     return new MysqlMTAPackAlgorithmDataIterator(iterators, multiTableAdapter->getIdExtrasPerRange());
671 }
672 
assignProw(const U2DataId & readId,qint64 prow,U2OpStatus & os)673 void MysqlMultiTablePackAlgorithmAdapter::assignProw(const U2DataId &readId, qint64 prow, U2OpStatus &os) {
674     int elenPos = multiTableAdapter->getElenRangePosById(readId);
675     int oldRowPos = multiTableAdapter->getRowRangePosById(readId);
676     int newRowPos = multiTableAdapter->getRowRangePosByRow(prow);
677 
678     MysqlSingleTablePackAlgorithmAdapter *sa = nullptr;
679     if (newRowPos == oldRowPos) {
680         sa = packAdaptersGrid[oldRowPos][elenPos];
681         sa->assignProw(readId, prow, os);
682         return;
683     }
684     ensureGridSize(newRowPos + 1);
685 
686     sa = packAdaptersGrid[newRowPos][elenPos];
687     MysqlMtaSingleTableAdapter *oldA = multiTableAdapter->getAdapterByRowAndElenRange(oldRowPos, elenPos, false, os);
688     MysqlMtaSingleTableAdapter *newA = multiTableAdapter->getAdapterByRowAndElenRange(newRowPos, elenPos, true, os);
689 
690     SAFE_POINT(oldA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(oldRowPos).arg(elenPos), );
691     SAFE_POINT(newA != nullptr, QString("Can't find reads table adapter: row: %1, elen: %2").arg(newRowPos).arg(elenPos), );
692     SAFE_POINT_OP(os, );
693 
694     if (sa == nullptr) {
695         sa = new MysqlSingleTablePackAlgorithmAdapter(multiTableAdapter->getDbRef(), newA->singleTableAdapter->getReadsTableName());
696         packAdapters << sa;
697         packAdaptersGrid[newRowPos][elenPos] = sa;
698     }
699 
700     QVector<MysqlReadTableMigrationData> &newTableData = migrations[newA];
701     newTableData.append(MysqlReadTableMigrationData(U2DbiUtils::toDbiId(readId), oldA, prow));
702     // TODO: add mem check here!
703 }
704 
releaseDbResources()705 void MysqlMultiTablePackAlgorithmAdapter::releaseDbResources() {
706     foreach (MysqlSingleTablePackAlgorithmAdapter *a, packAdapters) {
707         a->releaseDbResources();
708     }
709 }
710 
migrateAll(U2OpStatus & os)711 void MysqlMultiTablePackAlgorithmAdapter::migrateAll(U2OpStatus &os) {
712     qint64 nReadsToMigrate = 0;
713     foreach (MysqlMtaSingleTableAdapter *newTable, migrations.keys()) {
714         const QVector<MysqlReadTableMigrationData> &data = migrations[newTable];
715         nReadsToMigrate += data.size();
716     }
717 
718     if (nReadsToMigrate == 0) {
719         return;
720     }
721 
722     qint64 nReadsTotal = multiTableAdapter->countReads(U2_REGION_MAX, os);
723     qint64 migrationPercent = nReadsToMigrate * 100 / nReadsTotal;
724 
725     perfLog.trace(QString("Assembly: starting reads migration process. Reads to migrate: %1, total: %2 (%3%)").arg(nReadsToMigrate).arg(nReadsTotal).arg(migrationPercent));
726 
727     if (migrationPercent > MAX_PERCENT_TO_REINDEX) {
728         perfLog.trace("Assembly: dropping old indexes first");
729         foreach (MysqlMtaSingleTableAdapter *adapter, multiTableAdapter->getAdapters()) {
730             adapter->singleTableAdapter->dropReadsIndexes(os);
731         }
732         perfLog.trace("Assembly: indexes are dropped");
733     }
734 
735     SAFE_POINT_OP(os, );
736     int nMigrated = 0;
737     foreach (MysqlMtaSingleTableAdapter *newTable, migrations.keys()) {
738         const QVector<MysqlReadTableMigrationData> &data = migrations[newTable];
739         migrate(newTable, data, nMigrated, nReadsToMigrate, os);
740         nMigrated += data.size();
741     }
742     migrations.clear();
743 }
744 
ensureGridSize(int nRows)745 void MysqlMultiTablePackAlgorithmAdapter::ensureGridSize(int nRows) {
746     int oldNRows = packAdaptersGrid.size();
747     if (oldNRows < nRows) {
748         int nElens = multiTableAdapter->getNumberOfElenRanges();
749         packAdaptersGrid.resize(nRows);
750         for (int i = oldNRows; i < nRows; i++) {
751             packAdaptersGrid[i].resize(nElens);
752         }
753     }
754 }
755 
migrate(MysqlMtaSingleTableAdapter * newA,const QVector<MysqlReadTableMigrationData> & data,qint64 migratedBefore,qint64 totalMigrationCount,U2OpStatus & os)756 void MysqlMultiTablePackAlgorithmAdapter::migrate(MysqlMtaSingleTableAdapter *newA, const QVector<MysqlReadTableMigrationData> &data, qint64 migratedBefore, qint64 totalMigrationCount, U2OpStatus &os) {
757     // delete reads from old tables, and insert into new one
758     QHash<MysqlMtaSingleTableAdapter *, QVector<MysqlReadTableMigrationData>> readsByOldTable;
759     foreach (const MysqlReadTableMigrationData &d, data) {
760         readsByOldTable[d.oldTable].append(d);
761     }
762 
763     MysqlDbRef *db = multiTableAdapter->getDbRef();
764     MysqlTransaction t(db, os);
765     Q_UNUSED(t);
766 
767     foreach (MysqlMtaSingleTableAdapter *oldA, readsByOldTable.keys()) {
768         const QVector<MysqlReadTableMigrationData> &migData = readsByOldTable[oldA];
769         if (migData.isEmpty()) {
770             continue;
771         }
772 
773         QString oldTable = oldA->singleTableAdapter->getReadsTableName();
774         QString newTable = newA->singleTableAdapter->getReadsTableName();
775         QString idsTable = "tmp_mig_" + oldTable;  // TODO
776 
777 #ifdef _DEBUG
778         qint64 nOldReads1 = U2SqlQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
779         qint64 nNewReads1 = U2SqlQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
780         int readsMoved = migData.size();
781         int rowsPerRange = multiTableAdapter->getRowsPerRange();
782         U2Region newProwRegion(newA->rowPos * rowsPerRange, rowsPerRange);
783 #endif
784 
785         perfLog.trace(QString("Assembly: running reads migration from %1 to %2 number of reads: %3").arg(oldTable).arg(newTable).arg(migData.size()));
786         quint64 t0 = GTimer::currentTimeMicros();
787 
788         {  // nested block is needed to ensure all queries are finalized
789 
790             static const QString tempTableQuery = "CREATE TEMPORARY TABLE %1(id INTEGER PRIMARY KEY, prow INTEGER NOT NULL)";
791             U2SqlQuery(tempTableQuery.arg(idsTable), db, os).execute();
792             CHECK_OP(os, );
793 
794             static const QString insertQuery = "INSERT INTO %1(id, prow) VALUES(:id, :prow)";
795             for (const MysqlReadTableMigrationData &d : qAsConst(migData)) {
796                 U2SqlQuery insertIds(insertQuery.arg(idsTable), db, os);
797                 insertIds.bindInt64(":id", d.readId);
798                 insertIds.bindInt32(":prow", d.newProw);
799 #ifdef _DEBUG
800                 SAFE_POINT(newProwRegion.contains(d.newProw), "Invalid region", );
801 #endif
802                 insertIds.execute();
803                 CHECK_OP_BREAK(os);
804             }
805 
806             if (!os.isCoR()) {
807                 static const QString insertString = "INSERT INTO %1(prow, name, gstart, elen, flags, mq, data) "
808                                                     "SELECT %3.prow, name, gstart, elen, flags, mq, data FROM %2, %3 WHERE %2.id = %3.id";
809                 U2SqlQuery(insertString.arg(newTable).arg(oldTable).arg(idsTable), db, os).execute();
810 
811                 static const QString deleteString = "DELETE A.* FROM %1 AS A INNER JOIN %2 AS B ON A.id = B.id";
812                 U2SqlQuery(deleteString.arg(oldTable).arg(idsTable), db, os).execute();
813             }
814         }
815         U2OpStatusImpl osStub;  // using stub here -> this operation must be performed even if any of internal queries failed
816         static const QString dropTableString = "DROP TABLE IF EXISTS %1";
817         U2SqlQuery(dropTableString.arg(idsTable), db, osStub).execute();
818 
819         qint64 nMigrated = migratedBefore + migData.size();
820         perfLog.trace(QString("Assembly: reads migration from %1 to %2 finished, time %3 seconds, progress: %4/%5 (%6%)")
821                           .arg(oldTable)
822                           .arg(newTable)
823                           .arg((GTimer::currentTimeMicros() - t0) / float(1000 * 1000))
824                           .arg(nMigrated)
825                           .arg(totalMigrationCount)
826                           .arg(100 * nMigrated / totalMigrationCount));
827 
828 #ifdef _DEBUG
829         qint64 nOldReads2 = U2SqlQuery("SELECT COUNT(*) FROM " + oldTable, db, os).selectInt64();
830         qint64 nNewReads2 = U2SqlQuery("SELECT COUNT(*) FROM " + newTable, db, os).selectInt64();
831         SAFE_POINT(nOldReads1 + nNewReads1 == nOldReads2 + nNewReads2, "Invalid reads count", );
832         SAFE_POINT(nNewReads1 + readsMoved == nNewReads2, "Invalid reads count", );
833 #endif
834     }
835 }
836 
837 //////////////////////////////////////////////////////////////////////////
838 // MTAReadsIterator
839 
MysqlMtaReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> * > & i,const QVector<QByteArray> & ie,bool sorted)840 MysqlMtaReadsIterator::MysqlMtaReadsIterator(QVector<U2DbiIterator<U2AssemblyRead> *> &i, const QVector<QByteArray> &ie, bool sorted)
841     : iterators(i), currentRange(0), idExtras(ie), sortedHint(sorted) {
842 }
843 
~MysqlMtaReadsIterator()844 MysqlMtaReadsIterator::~MysqlMtaReadsIterator() {
845     qDeleteAll(iterators);
846 }
847 
848 // TODO: remove copy-paste from this code
hasNext()849 bool MysqlMtaReadsIterator::hasNext() {
850     if (sortedHint) {
851         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
852             if (it->hasNext()) {
853                 return true;
854             }
855         }
856         return false;
857     } else {
858         bool res = currentRange < iterators.size();
859         if (res) {
860             do {
861                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
862                 res = it->hasNext();
863                 if (res) {
864                     break;
865                 }
866                 currentRange++;
867             } while (currentRange < iterators.size());
868         }
869         return res;
870     }
871 }
872 
next()873 U2AssemblyRead MysqlMtaReadsIterator::next() {
874     U2AssemblyRead res;
875     if (sortedHint) {
876         qint64 minPos = LLONG_MAX;
877         U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
878         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
879             if (it->hasNext()) {
880                 U2AssemblyRead candidate = it->peek();
881                 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
882                 if (candidate->leftmostPos < minPos) {
883                     minIt = it;
884                     minPos = candidate->leftmostPos;
885                 }
886             }
887         }
888         if (nullptr != minIt) {
889             res = minIt->next();
890             SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
891             int currentIt = iterators.indexOf(minIt);
892             const QByteArray &idExtra = idExtras.at(currentIt);
893             res->id = addTable2Id(res->id, idExtra);
894         }
895         return res;
896     } else {
897         if (currentRange < iterators.size()) {
898             do {
899                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
900                 if (it->hasNext()) {
901                     res = it->next();
902                     SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
903                     const QByteArray &idExtra = idExtras.at(currentRange);
904                     res->id = addTable2Id(res->id, idExtra);
905                     break;
906                 }
907                 currentRange++;
908             } while (currentRange < iterators.size());
909         }
910         return res;
911     }
912 }
913 
peek()914 U2AssemblyRead MysqlMtaReadsIterator::peek() {
915     U2AssemblyRead res;
916     if (sortedHint) {
917         qint64 minPos = LLONG_MAX;
918         U2DbiIterator<U2AssemblyRead> *minIt = nullptr;
919         foreach (U2DbiIterator<U2AssemblyRead> *it, iterators) {
920             if (it->hasNext()) {
921                 U2AssemblyRead candidate = it->peek();
922                 SAFE_POINT(nullptr != candidate.data(), "NULL assembly read", candidate);
923                 if (candidate->leftmostPos < minPos) {
924                     minIt = it;
925                     minPos = candidate->leftmostPos;
926                 }
927             }
928         }
929         if (nullptr != minIt) {
930             res = minIt->next();
931             SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
932             int currentIt = iterators.indexOf(minIt);
933             const QByteArray &idExtra = idExtras.at(currentIt);
934             res->id = addTable2Id(res->id, idExtra);
935         }
936         return res;
937     } else {
938         if (currentRange < iterators.size()) {
939             do {
940                 U2DbiIterator<U2AssemblyRead> *it = iterators[currentRange];
941                 if (it->hasNext()) {
942                     res = it->peek();
943                     SAFE_POINT(nullptr != res.data(), "NULL assembly read", res);
944                     const QByteArray &idExtra = idExtras.at(currentRange);
945                     res->id = addTable2Id(res->id, idExtra);
946                     break;
947                 }
948                 currentRange++;
949             } while (currentRange < iterators.size());
950         }
951         return res;
952     }
953 }
954 
955 //////////////////////////////////////////////////////////////////////////
956 // MTAPackAlgorithmDataIterator
957 
MysqlMTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> * > & i,const QVector<QByteArray> & ie)958 MysqlMTAPackAlgorithmDataIterator::MysqlMTAPackAlgorithmDataIterator(QVector<U2DbiIterator<PackAlgorithmData> *> &i, const QVector<QByteArray> &ie)
959     : iterators(i), idExtras(ie) {
960     fetchNextData();
961 }
962 
~MysqlMTAPackAlgorithmDataIterator()963 MysqlMTAPackAlgorithmDataIterator::~MysqlMTAPackAlgorithmDataIterator() {
964     qDeleteAll(iterators);
965 }
966 
hasNext()967 bool MysqlMTAPackAlgorithmDataIterator::hasNext() {
968     return !nextData.readId.isEmpty();
969 }
970 
next()971 PackAlgorithmData MysqlMTAPackAlgorithmDataIterator::next() {
972     PackAlgorithmData res = nextData;
973     fetchNextData();
974     return res;
975 }
976 
peek()977 PackAlgorithmData MysqlMTAPackAlgorithmDataIterator::peek() {
978     return nextData;
979 }
980 
fetchNextData()981 void MysqlMTAPackAlgorithmDataIterator::fetchNextData() {
982     PackAlgorithmData bestCandidate;
983     int bestRange = 0;
984     for (int i = 0; i < iterators.size(); i++) {
985         U2DbiIterator<PackAlgorithmData> *it = iterators[i];
986         if (!it->hasNext()) {
987             continue;
988         }
989         PackAlgorithmData d = it->peek();
990         if (bestCandidate.readId.isEmpty() || bestCandidate.leftmostPos > d.leftmostPos) {
991             bestCandidate = d;
992             bestRange = i;
993         }
994     }
995     nextData = bestCandidate;
996     if (!nextData.readId.isEmpty()) {
997         iterators[bestRange]->next();
998         const QByteArray &idExtra = idExtras.at(bestRange);
999         nextData.readId = addTable2Id(nextData.readId, idExtra);
1000     }
1001 }
1002 
1003 }  // namespace U2
1004