1 /*===========================================================================
2  *
3  *                            PUBLIC DOMAIN NOTICE
4  *               National Center for Biotechnology Information
5  *
6  *  This software/database is a "United States Government Work" under the
7  *  terms of the United States Copyright Act.  It was written as part of
8  *  the author's official duties as a United States Government employee and
9  *  thus cannot be copyrighted.  This software/database is freely available
10  *  to the public for use. The National Library of Medicine and the U.S.
11  *  Government have not placed any restriction on its use or reproduction.
12  *
13  *  Although all reasonable efforts have been taken to ensure the accuracy
14  *  and reliability of the software and data, the NLM and the U.S.
15  *  Government do not and cannot warrant the performance or results that
16  *  may be obtained by using this software or data. The NLM and the U.S.
17  *  Government disclaim all warranties, express or implied, including
18  *  warranties of performance, merchantability or fitness for any particular
19  *  purpose.
20  *
21  *  Please cite the author in any work or product based on this material.
22  *
23  * ===========================================================================
24  *
25  */
26 
27 #include <klib/rc.h>
28 #include <klib/log.h>
29 #include <kfs/file.h>
30 
31 #include <vdb/schema.h>
32 #include <vdb/table.h>
33 #include <vdb/cursor.h>
34 
35 #include <align/writer-refseq.h>
36 
37 #include <loader/reference-writer.h>
38 
39 #include <stdlib.h>
40 #include <limits.h>
41 #include <string.h>
42 #include <assert.h>
43 #include <ctype.h>
44 
45 #define SORTED_OPEN_TABLE_LIMIT (2)
46 #define SORTED_CACHE_SIZE ((2 * 1024 * 1024)/(SORTED_OPEN_TABLE_LIMIT))
47 
48 #ifdef __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__
49 #define UNSORTED_OPEN_TABLE_LIMIT (8)
50 #define UNSORTED_CACHE_SIZE ((1024 * 1024 * 1024)/(UNSORTED_OPEN_TABLE_LIMIT))
51 #else
52 #define UNSORTED_OPEN_TABLE_LIMIT (64)
53 #define UNSORTED_CACHE_SIZE (350 * 1024 * 1024)
54 #endif
55 
56 #if _DEBUGGING
57 #define DUMP_CONFIG 1
58 #endif
59 
60 struct overlap_s {
61     uint32_t min; /* minimum start pos of any alignment that ends in this chunk */
62     uint32_t max; /* maximum end pos of any alignment that starts before this chunk and ends in this chunk */
63 };
64 
65 extern void ReferenceMgr_DumpConfig(ReferenceMgr const *const self);
66 
ReferenceInit(Reference * self,const VDBManager * mgr,VDatabase * db,bool expectUnsorted,bool acceptHardClip,char const * refXRefPath,char const * inpath,uint32_t maxSeqLen,char const ** refFiles)67 rc_t ReferenceInit(Reference *self,
68                    const VDBManager *mgr,
69                    VDatabase *db,
70                    bool expectUnsorted,
71                    bool acceptHardClip,
72                    char const *refXRefPath,
73                    char const *inpath,
74                    uint32_t maxSeqLen,
75                    char const** refFiles
76                    )
77 {
78     rc_t rc;
79     size_t const cache = expectUnsorted ? UNSORTED_CACHE_SIZE : SORTED_CACHE_SIZE;
80     unsigned const open_count = expectUnsorted ? UNSORTED_OPEN_TABLE_LIMIT : SORTED_OPEN_TABLE_LIMIT;
81 
82     memset(self, 0, sizeof(*self));
83 
84     self->coverage.elem_bits = self->mismatches.elem_bits = self->indels.elem_bits = 32;
85     self->pri_align.elem_bits = self->sec_align.elem_bits = 64;
86     self->pri_overlap.elem_bits = self->sec_overlap.elem_bits = sizeof(struct overlap_s) * 8;
87 
88     self->acceptHardClip = acceptHardClip;
89 
90     rc = ReferenceMgr_Make(&self->mgr, db, mgr,
91                            ewrefmgr_co_Coverage,
92                            refXRefPath, inpath,
93                            maxSeqLen, cache, open_count);
94     if (rc == 0 && refFiles != NULL) {
95         unsigned i;
96 
97         for (i = 0; refFiles[i]; ++i) {
98             rc = ReferenceMgr_FastaPath(self->mgr, refFiles[i]);
99             if (rc) {
100                 (void)PLOGERR(klogWarn, (klogWarn, rc, "fasta file '$(file)'", "file=%s", refFiles[i]));
101                 break;
102             }
103         }
104 #if DUMP_CONFIG
105         if (rc == 0) {
106             ReferenceMgr_DumpConfig(self->mgr);
107         }
108 #endif
109     }
110     return rc;
111 }
112 
113 static
Unsorted(Reference * self)114 void Unsorted(Reference *self) {
115     bool dummy1 = false;
116     bool dummy2 = false;
117 
118     (void)LOGMSG(klogWarn, "Alignments are unsorted");
119 
120     self->out_of_order = true;
121 
122     ReferenceSeq_Release(self->rseq);
123     ReferenceMgr_SetCache(self->mgr, UNSORTED_CACHE_SIZE, UNSORTED_OPEN_TABLE_LIMIT);
124     ReferenceMgr_GetSeq(self->mgr, &self->rseq, self->last_id, &dummy1, true, &dummy2);
125 
126     KDataBufferWhack(&self->sec_align);
127     KDataBufferWhack(&self->pri_align);
128     KDataBufferWhack(&self->mismatches);
129     KDataBufferWhack(&self->indels);
130     KDataBufferWhack(&self->coverage);
131     KDataBufferWhack(&self->pri_overlap);
132     KDataBufferWhack(&self->sec_overlap);
133 }
134 
135 #define BAIL_ON_FAIL(STMT) do { rc_t const rc__ = (STMT); if (rc__) return rc__; } while(0)
136 
FlushBuffers(Reference * self,uint64_t upto,bool full,bool final,uint32_t maxSeqLen)137 static rc_t FlushBuffers(Reference *self, uint64_t upto, bool full, bool final, uint32_t maxSeqLen)
138 {
139     if (!self->out_of_order && upto > 0) {
140         size_t offset = 0;
141         unsigned *const miss = (unsigned *)self->mismatches.base;
142         unsigned *const indel = (unsigned *)self->indels.base;
143         unsigned *const cov = (unsigned *)self->coverage.base;
144         struct overlap_s *const pri_overlap = (struct overlap_s *)self->pri_overlap.base;
145         struct overlap_s *const sec_overlap = (struct overlap_s *)self->sec_overlap.base;
146         unsigned chunk = 0;
147 
148         while ((self->curPos + offset + (full ? 0 : maxSeqLen)) <= upto) {
149             ReferenceSeqCoverage data;
150             uint64_t const curPos = self->curPos + offset;
151             unsigned const n = self->endPos > (curPos + maxSeqLen) ?
152                                maxSeqLen : (self->endPos - curPos);
153             unsigned const m = curPos + n > upto ? upto - curPos : n;
154             unsigned i;
155             unsigned hi;
156             unsigned lo;
157 
158             if (n == 0) break;
159 
160             memset(&data, 0, sizeof(data));
161 
162             data.ids[ewrefcov_primary_table].elements = self->pri_align.elem_count;
163             data.ids[ewrefcov_primary_table].buffer = self->pri_align.base;
164             data.overlap_ref_pos[ewrefcov_primary_table] = pri_overlap[chunk].min;
165             data.overlap_ref_len[ewrefcov_primary_table] = pri_overlap[chunk].max ? pri_overlap[chunk].max - curPos : 0;
166 
167             data.ids[ewrefcov_secondary_table].elements = self->sec_align.elem_count;
168             data.ids[ewrefcov_secondary_table].buffer = self->sec_align.base;
169             data.overlap_ref_pos[ewrefcov_secondary_table] = sec_overlap[chunk].min;
170             data.overlap_ref_len[ewrefcov_secondary_table] = sec_overlap[chunk].max ? sec_overlap[chunk].max - curPos : 0;
171 
172             for (hi = 0, lo = UINT_MAX, i = 0; i != m; ++i) {
173                 unsigned const coverage = cov[offset + i];
174 
175                 if (hi < coverage)
176                     hi = coverage;
177                 if (lo > coverage)
178                     lo = coverage;
179             }
180             data.low  = lo > 255 ? 255 : lo;
181             data.high = hi > 255 ? 255 : hi;
182 
183             for (i = 0; i != m; ++i)
184                 data.mismatches += miss[offset + i];
185 
186             for (i = 0; i != m; ++i)
187                 data.indels += indel[offset + i];
188 
189             {
190                 rc_t rc = ReferenceSeq_AddCoverage(self->rseq, curPos, &data);
191 
192                 if (rc) {
193                     Unsorted(self);
194                     return 0;
195                 }
196             }
197 
198             KDataBufferResize(&self->pri_align, 0);
199             KDataBufferResize(&self->sec_align, 0);
200             offset += n;
201             ++chunk;
202         }
203         if (!final && offset > 0) {
204             unsigned const newChunkCount = self->pri_overlap.elem_count - chunk;
205             unsigned const newBaseCount = self->endPos - self->curPos - offset;
206 
207             memmove(self->pri_overlap.base, pri_overlap + chunk, newChunkCount * sizeof(pri_overlap[0]));
208             memmove(self->sec_overlap.base, sec_overlap + chunk, newChunkCount * sizeof(sec_overlap[0]));
209             memmove(self->mismatches.base, miss + offset, newBaseCount * sizeof(miss[0]));
210             memmove(self->indels.base, indel + offset, newBaseCount * sizeof(indel[0]));
211             memmove(self->coverage.base, cov + offset, newBaseCount * sizeof(cov[0]));
212 
213             KDataBufferResize(&self->pri_overlap, newChunkCount);
214             KDataBufferResize(&self->sec_overlap, newChunkCount);
215 
216             self->curPos += offset;
217         }
218     }
219     return 0;
220 }
221 
ReferenceSetFile(Reference * self,const char id[],uint64_t length,uint8_t const md5[16],uint32_t maxSeqLen,bool * shouldUnmap)222 rc_t ReferenceSetFile(Reference *self, const char id[],
223                       uint64_t length, uint8_t const md5[16],
224                       uint32_t maxSeqLen, bool *shouldUnmap)
225 {
226     ReferenceSeq const *rseq;
227     bool wasRenamed = false;
228     unsigned n;
229 
230     for (n = 0; ; ++n) {
231         if (self->last_id[n] != id[n])
232             break;
233         if (self->last_id[n] == 0 && id[n] == 0)
234             return 0;
235     }
236     while (id[n])
237         ++n;
238     if (n >= sizeof(self->last_id))
239         return RC(rcApp, rcTable, rcAccessing, rcParam, rcTooLong);
240 
241     BAIL_ON_FAIL(FlushBuffers(self, self->length, true, true, maxSeqLen));
242     BAIL_ON_FAIL(ReferenceMgr_GetSeq(self->mgr, &rseq, id, shouldUnmap, true, &wasRenamed));
243 
244     if (self->rseq)
245         ReferenceSeq_Release(self->rseq);
246 
247     self->rseq = rseq;
248 
249     memmove(self->last_id, id, n + 1);
250     self->curPos = self->endPos = 0;
251     self->length = length;
252     self->lastOffset = 0;
253 
254     if(!self->out_of_order) (void)PLOGMSG(klogInfo, (klogInfo, "Processing Reference '$(id)'", "id=%s", id));
255 
256     return 0;
257 }
258 
ReferenceVerify(Reference const * self,char const id[],uint64_t length,uint8_t const md5[16])259 rc_t ReferenceVerify(Reference const *self, char const id[], uint64_t length, uint8_t const md5[16])
260 {
261     bool dummy = false;
262     return ReferenceMgr_Verify(self->mgr, id, length, md5, true, &dummy);
263 }
264 
ReferenceGet1stRow(Reference const * self,int64_t * refID,char const refName[])265 rc_t ReferenceGet1stRow(Reference const *self, int64_t *refID, char const refName[])
266 {
267     ReferenceSeq const *rseq;
268     bool shouldUnmap = false;
269     bool wasRenamed = false;
270     rc_t rc = ReferenceMgr_GetSeq(self->mgr, &rseq, refName, &shouldUnmap, true, &wasRenamed);
271     if (rc == 0) {
272         assert(shouldUnmap == false);
273         rc = ReferenceSeq_Get1stRow(rseq, refID);
274         ReferenceSeq_Release(rseq);
275     }
276     return rc;
277 }
278 
279 static
ReferenceAddCoverage(Reference * const self,unsigned const refStart,unsigned const refLength,uint32_t const mismatches,uint32_t const indels,bool const isPrimary,uint32_t maxSeqLen)280 rc_t ReferenceAddCoverage(Reference *const self,
281                           unsigned const refStart,
282                           unsigned const refLength,
283                           uint32_t const mismatches,
284                           uint32_t const indels,
285                           bool const isPrimary,
286                           uint32_t maxSeqLen
287                           )
288 {
289     unsigned const refEnd = refStart + refLength;
290 
291     if (refEnd > self->endPos) {
292         unsigned const t1 = refEnd + (maxSeqLen - 1);
293         unsigned const adjust = t1 % maxSeqLen;
294         unsigned const newEndPos = t1 - adjust;
295         unsigned const baseCount = self->endPos - self->curPos;
296         unsigned const newBaseCount = newEndPos - self->curPos;
297 
298         BAIL_ON_FAIL(KDataBufferResize(&self->coverage, newBaseCount));
299         BAIL_ON_FAIL(KDataBufferResize(&self->mismatches, newBaseCount));
300         BAIL_ON_FAIL(KDataBufferResize(&self->indels, newBaseCount));
301 
302         memset(&((unsigned *)self->coverage.base)[baseCount], 0, (newBaseCount - baseCount) * sizeof(unsigned));
303         memset(&((unsigned *)self->mismatches.base)[baseCount], 0, (newBaseCount - baseCount) * sizeof(unsigned));
304         memset(&((unsigned *)self->indels.base)[baseCount], 0, (newBaseCount - baseCount) * sizeof(unsigned));
305         self->endPos = newEndPos;
306     }
307     if ((refEnd - self->curPos) / maxSeqLen >= self->pri_overlap.elem_count) {
308         unsigned const chunks = (refEnd - self->curPos) / maxSeqLen + 1;
309         unsigned const end = self->pri_overlap.elem_count;
310 
311         BAIL_ON_FAIL(KDataBufferResize(&self->pri_overlap, chunks));
312         BAIL_ON_FAIL(KDataBufferResize(&self->sec_overlap, chunks));
313 
314         memset(&((struct overlap_s *)self->pri_overlap.base)[end], 0, (chunks - end) * sizeof(struct overlap_s));
315         memset(&((struct overlap_s *)self->sec_overlap.base)[end], 0, (chunks - end) * sizeof(struct overlap_s));
316     }
317     BAIL_ON_FAIL(FlushBuffers(self, refStart, false, false, maxSeqLen));
318     if (!self->out_of_order) {
319         unsigned const startBase = refStart - self->curPos;
320         unsigned const endChunk = (startBase + refLength) / maxSeqLen;
321         KDataBuffer *const overlapBuffer = isPrimary ? &self->pri_overlap : &self->sec_overlap;
322         unsigned *const cov = &((unsigned *)self->coverage.base)[startBase];
323         unsigned i;
324 
325         ((unsigned *)self->mismatches.base)[startBase] += mismatches;
326         ((unsigned *)self->indels.base)[startBase] += indels;
327 
328         if (((struct overlap_s *)overlapBuffer->base)[endChunk].min == 0 ||
329             ((struct overlap_s *)overlapBuffer->base)[endChunk].min > refStart)
330         {
331             ((struct overlap_s *)overlapBuffer->base)[endChunk].min = refStart;
332         }
333         if (endChunk != 0 &&
334             ((struct overlap_s *)overlapBuffer->base)[endChunk].max < refStart + refLength)
335         {
336             ((struct overlap_s *)overlapBuffer->base)[endChunk].max = refStart + refLength;
337         }
338 
339         for (i = 0; i != refLength; ++i) {
340             if (cov[i] < UINT_MAX)
341                 ++cov[i];
342         }
343     }
344     return 0;
345 }
346 
GetCounts(AlignmentRecord const * data,unsigned const seqLen,unsigned * const nMatch,unsigned * const nMiss,unsigned * const nIndels)347 static void GetCounts(AlignmentRecord const *data, unsigned const seqLen,
348                       unsigned *const nMatch,
349                       unsigned *const nMiss,
350                       unsigned *const nIndels)
351 {
352     bool const *const has_mismatch = data->data.has_mismatch.buffer;
353     bool const *const has_offset = data->data.has_ref_offset.buffer;
354     int32_t const *const ref_offset = data->data.ref_offset.buffer;
355     uint8_t const *const ref_offset_type = data->data.ref_offset_type.buffer;
356     unsigned misses = 0;
357     unsigned matchs = 0;
358     unsigned insert = 0;
359     unsigned delete = 0;
360     unsigned j = 0;
361     unsigned i;
362 
363     for (i = 0; i < seqLen; ) {
364         if (has_offset[i]) {
365             int const offs = ref_offset[j];
366             int const type = ref_offset_type[j];
367 
368             ++j;
369             if (type == 0) {
370                 if (offs < 0)
371                     ++insert;
372                 else
373                     ++delete;
374             }
375             if (offs < 0) {
376                 i += (unsigned)(-offs);
377                 continue;
378             }
379         }
380         if (has_mismatch[i])
381             ++misses;
382         else
383             ++matchs;
384         ++i;
385     }
386     *nMatch = matchs;
387     *nMiss  = misses;
388     *nIndels = insert + delete;
389 }
390 
ReferenceRead(Reference * self,AlignmentRecord * data,uint64_t const pos,uint32_t const rawCigar[],uint32_t const cigCount,char const seqDNA[],uint32_t const seqLen,uint8_t const rna_orient,uint32_t * matches,bool acceptNoMatch,unsigned minMatchCount,uint32_t maxSeqLen)391 rc_t ReferenceRead(Reference *self, AlignmentRecord *data,
392                    uint64_t const pos,
393                    uint32_t const rawCigar[], uint32_t const cigCount,
394                    char const seqDNA[], uint32_t const seqLen,
395                    uint8_t const rna_orient, uint32_t *matches,
396                    bool acceptNoMatch, unsigned minMatchCount, uint32_t maxSeqLen)
397 {
398     *matches = 0;
399     BAIL_ON_FAIL(ReferenceSeq_Compress(self->rseq,
400                                        (self->acceptHardClip ? ewrefmgr_co_AcceptHardClip : 0) + ewrefmgr_cmp_Binary,
401                                        pos,
402                                        seqDNA, seqLen, rawCigar, cigCount, 0, NULL, 0, 0, NULL, 0, rna_orient, &data->data));
403 
404     if (!acceptNoMatch && data->data.ref_len == 0)
405         return RC(rcApp, rcFile, rcReading, rcConstraint, rcViolated);
406 
407     if (!self->out_of_order && pos < self->lastOffset) {
408         Unsorted(self);
409     }
410     if (self->out_of_order)
411         return 0;
412     else {
413         unsigned nmis;
414         unsigned nmatch;
415         unsigned indels;
416 
417         self->lastOffset = data->data.effective_offset;
418         GetCounts(data, seqLen, &nmatch, &nmis, &indels);
419         *matches = nmatch;
420 
421         if (acceptNoMatch || nmatch >= minMatchCount)
422             return ReferenceAddCoverage(self, data->data.effective_offset,
423                                         data->data.ref_len, nmis, indels,
424                                         data->isPrimary, maxSeqLen);
425         else
426             return RC(rcApp, rcFile, rcReading, rcConstraint, rcViolated);
427     }
428 }
429 
IdVecAppend(KDataBuffer * vec,uint64_t id)430 static rc_t IdVecAppend(KDataBuffer *vec, uint64_t id)
431 {
432     uint64_t const end = vec->elem_count;
433 
434     BAIL_ON_FAIL(KDataBufferResize(vec, end + 1));
435     ((uint64_t *)vec->base)[end] = id;
436     return 0;
437 }
438 
ReferenceAddAlignId(Reference * self,int64_t align_id,bool is_primary)439 rc_t ReferenceAddAlignId(Reference *self,
440                          int64_t align_id,
441                          bool is_primary
442                          )
443 {
444     if (self->out_of_order)
445         return 0;
446     return IdVecAppend(is_primary ? &self->pri_align : &self->sec_align, align_id);
447 }
448 
ReferenceWhack(Reference * self,bool commit,uint32_t maxSeqLen,rc_t (* const quitting)(void))449 rc_t ReferenceWhack(Reference *self, bool commit, uint32_t maxSeqLen,
450                     rc_t (*const quitting)(void)
451                     )
452 {
453     rc_t rc = 0;
454 
455     if (self) {
456 #if DUMP_CONFIG
457         if (self->mgr)
458             ReferenceMgr_DumpConfig(self->mgr);
459 #endif
460         if (commit) {
461             rc = FlushBuffers(self, self->length, true, true, maxSeqLen);
462             if (rc != 0)
463                 commit = false;
464         }
465         KDataBufferWhack(&self->sec_align);
466         KDataBufferWhack(&self->pri_align);
467         KDataBufferWhack(&self->mismatches);
468         KDataBufferWhack(&self->indels);
469         KDataBufferWhack(&self->coverage);
470         KDataBufferWhack(&self->pri_overlap);
471         KDataBufferWhack(&self->sec_overlap);
472         if (self->rseq)
473             rc = ReferenceSeq_Release(self->rseq);
474         if (self->out_of_order) {
475             (void)LOGMSG(klogInfo, "Starting coverage calculation");
476             rc = ReferenceMgr_Release(self->mgr, commit, NULL, true, quitting);
477         }
478         else {
479             rc = ReferenceMgr_Release(self->mgr, commit, NULL, false, NULL);
480         }
481     }
482     return rc;
483 }
484