1 
2 /******************************************************************************
3  *
4  *  This file is part of meryl, a genomic k-kmer counter with nice features.
5  *
6  *  This software is based on:
7  *    'Canu' v2.0              (https://github.com/marbl/canu)
8  *  which is based on:
9  *    'Celera Assembler' r4587 (http://wgs-assembler.sourceforge.net)
10  *    the 'kmer package' r1994 (http://kmer.sourceforge.net)
11  *
12  *  Except as indicated otherwise, this is a 'United States Government Work',
13  *  and is released in the public domain.
14  *
15  *  File 'README.licenses' in the root directory of this distribution
16  *  contains full conditions and disclaimers.
17  */
18 
19 #include "meryl-lookup.H"
20 #include "sweatShop.H"
21 
22 
23 
24 class filterInput {
25 public:
filterInput()26   filterInput() {
27   };
~filterInput()28   ~filterInput() {
29   };
30 
31   dnaSeq        seq1;
32   dnaSeq        seq2;
33 
34   uint64        nTotal;
35   uint64        nFound;
36 };
37 
38 
39 
40 
41 static
42 void *
loadSequence(void * G)43 loadSequence(void *G) {
44   lookupGlobal *g = (lookupGlobal *)G;
45   filterInput  *s = new filterInput;
46 
47   bool   load1 = (g->seqFile1 != nullptr) && (g->seqFile1->loadSequence(s->seq1) == true);
48   bool   load2 = (g->seqFile2 != nullptr) && (g->seqFile2->loadSequence(s->seq2) == true);
49 
50   if ((load1 == false) &&
51       (load2 == false)) {
52     delete s;
53     return(nullptr);
54   }
55 
56   return(s);
57 }
58 
59 
60 
61 static
62 uint64
processSequence(merylExactLookup * L,dnaSeq & seq,bool is10x)63 processSequence(merylExactLookup *L, dnaSeq &seq, bool is10x) {
64   kmerIterator kiter(seq.bases(), seq.length());
65   uint64       found = 0;
66 
67   //  Ignore the first 23 kmers in seq
68 
69   while (kiter.nextMer()) {
70     if (is10x && kiter.bgnPosition() < 23)  continue;
71     if ((L->value(kiter.fmer()) > 0) ||
72         (L->value(kiter.rmer()) > 0))
73       found++;
74   }
75 
76   return(found);
77 }
78 
79 
80 static
81 void
processSequence(void * G,void * T,void * S)82 processSequence(void *G, void *T, void *S) {
83   lookupGlobal *g = (lookupGlobal *)G;
84   filterInput  *s = (filterInput  *)S;
85 
86   //  Count the number of kmers found in the database from either
87   //  seq1 or seq2.
88   //
89   //  If this is 10X Genomics reads, ignore counting in the first 23 bp of the seq1.
90 
91   s->nFound  = processSequence(g->lookupDBs[0], s->seq1, g->is10x);
92   s->nFound += processSequence(g->lookupDBs[0], s->seq2, false);
93 }
94 
95 
96 
97 static
98 void
outputSequence(compressedFileWriter * O,dnaSeq & seq,uint64 nFound)99 outputSequence(compressedFileWriter  *O,
100                dnaSeq                &seq,
101                uint64                 nFound) {
102 
103   if (O == nullptr)
104     return;
105 
106   if (seq.quals()[0] == 0)   outputFASTA(O->file(), seq.bases(),              seq.length(), 0, "%s nKmers=%lu", seq.ident(), nFound);
107   else                       outputFASTQ(O->file(), seq.bases(), seq.quals(), seq.length(),    "%s nKmers=%lu", seq.ident(), nFound);
108 }
109 
110 
111 static
112 void
outputSequence(void * G,void * S)113 outputSequence(void *G, void *S) {
114   lookupGlobal *g = (lookupGlobal *)G;
115   filterInput  *s = (filterInput  *)S;
116 
117   g->nReadsTotal++;
118 
119   //  Write output if:
120   //    'include' and    mers found.
121   //    'exclude' and no mers found.
122 
123   if (((s->nFound  > 0) && (g->reportType == lookupOp::opInclude)) ||
124       ((s->nFound == 0) && (g->reportType == lookupOp::opExclude))) {
125     g->nReadsFound++;
126 
127     outputSequence(g->outFile1, s->seq1, s->nFound);
128     outputSequence(g->outFile2, s->seq2, s->nFound);
129   }
130 
131   delete s;
132 }
133 
134 
135 
136 void
filter(lookupGlobal * g)137 filter(lookupGlobal *g) {
138 
139   if (g->is10x)
140     fprintf(stderr, "\nRunning in 10x mode. The first 23 bp of every sequence in %s will be ignored while looking up.\n", g->seqFile1->filename());
141 
142   sweatShop     *ss = new sweatShop(loadSequence, processSequence, outputSequence);
143 
144   ss->setLoaderQueueSize(10240);
145   ss->setNumberOfWorkers(omp_get_max_threads());
146   ss->setWriterQueueSize(omp_get_max_threads());
147 
148   ss->run(g, g->showProgress);
149 
150   delete ss;
151 
152   fprintf(stderr, "\nIncluding %lu reads (or read pairs) out of %lu.\n", g->nReadsTotal, g->nReadsFound);
153 }
154 
155