1 #include "result2stats.h"
2 
3 #include "AminoAcidLookupTables.h"
4 
5 #include "Debug.h"
6 #include "Util.h"
7 #include "FileUtil.h"
8 #include "itoa.h"
9 #include "Parameters.h"
10 
11 #ifdef OPENMP
12 #include <omp.h>
13 #endif
14 
15 enum {
16     STAT_LINECOUNT,
17     STAT_MEAN,
18     STAT_SUM,
19     STAT_DOOLITTLE,
20     STAT_CHARGES,
21     STAT_SEQLEN,
22     STAT_STRLEN,
23     STAT_FIRSTLINE,
24     STAT_UNKNOWN
25 };
26 
MapStatString(const std::string & str)27 int MapStatString(const std::string &str) {
28     int stat;
29     if (str == "linecount") {
30         stat = STAT_LINECOUNT;
31     } else if (str == "mean") {
32         stat = STAT_MEAN;
33     } else if (str == "sum") {
34         stat = STAT_SUM;
35     } else if (str == "doolittle") {
36         stat = STAT_DOOLITTLE;
37     } else if (str == "charges") {
38         stat = STAT_CHARGES;
39     } else if (str == "seqlen") {
40         stat = STAT_SEQLEN;
41     } else if (str == "strlen") {
42         stat = STAT_STRLEN;
43     } else if (str == "firstline") {
44         stat = STAT_FIRSTLINE;
45     } else {
46         stat = STAT_UNKNOWN;
47     }
48 
49     return stat;
50 }
51 
StatsComputer(const Parameters & par)52 StatsComputer::StatsComputer(const Parameters &par)
53         : stat(MapStatString(par.stat)),
54           queryDb(par.db1), queryDbIndex(par.db1Index),
55           targetDb(par.db2), targetDbIndex(par.db2Index), tsvOut(par.tsvOut) {
56     resultReader = new DBReader<unsigned int>(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
57     resultReader->open(DBReader<unsigned int>::LINEAR_ACCCESS);
58     this->threads = par.threads;
59 
60     const bool shouldCompress = tsvOut == false && par.compressed == true;
61     const int dbType = tsvOut == true ? Parameters::DBTYPE_OMIT_FILE : Parameters::DBTYPE_GENERIC_DB;
62     statWriter = new DBWriter(par.db4.c_str(), par.db4Index.c_str(), (unsigned int) par.threads, shouldCompress, dbType);
63     statWriter->open();
64 }
65 
66 float doolittle(const char*);
67 float charges(const char*);
seqlen(const char * sequence)68 size_t seqlen(const char* sequence) {
69     const char* p = sequence;
70     size_t length = 0;
71     while (p != NULL) {
72         if ((*p >= 'A' && *p < 'Z') || (*p >= 'a' && *p < 'z') || *p == '*') {
73             length++;
74         } else {
75             break;
76         }
77 
78         p++;
79     }
80     return length;
81 }
82 std::string firstline(const char*);
83 
run()84 int StatsComputer::run() {
85     switch (stat) {
86         case STAT_LINECOUNT:
87             return countNumberOfLines();
88         case STAT_MEAN:
89             return meanValue();
90         case STAT_SUM:
91             return sumValue();
92         case STAT_DOOLITTLE:
93             return sequenceWise<float>(&doolittle);
94         case STAT_CHARGES:
95             return sequenceWise<float>(&charges);
96         case STAT_SEQLEN:
97             return sequenceWise<size_t>(&seqlen);
98         case STAT_STRLEN:
99             return sequenceWise<size_t>(&std::strlen);
100         case STAT_FIRSTLINE:
101             return sequenceWise<std::string>(&firstline, true);
102         //case STAT_COMPOSITION:
103         //    return sequenceWise(&statsComputer::composition);
104         case STAT_UNKNOWN:
105         default:
106             Debug(Debug::ERROR) << "Unrecognized statistic: " << stat << "\n";
107             Debug(Debug::ERROR) << "Please define --stat parameter\n";
108 
109             EXIT(EXIT_FAILURE);
110     }
111 }
112 
~StatsComputer()113 StatsComputer::~StatsComputer() {
114     statWriter->close(tsvOut);
115     if (tsvOut) {
116         FileUtil::remove(statWriter->getIndexFileName());
117     }
118     resultReader->close();
119     delete statWriter;
120     delete resultReader;
121 }
122 
countNumberOfLines()123 int StatsComputer::countNumberOfLines() {
124     Debug::Progress progress(resultReader->getSize());
125 
126 #pragma omp parallel
127     {
128         int thread_idx = 0;
129 #ifdef OPENMP
130         thread_idx = omp_get_thread_num();
131 #endif
132 #pragma omp for schedule(dynamic, 100)
133         for (size_t id = 0; id < resultReader->getSize(); id++) {
134             progress.updateProgress();
135 
136             unsigned int lineCount(0);
137             std::string lineCountString;
138 
139             char *results = resultReader->getData(id, thread_idx);
140             while (*results != '\0') {
141                 if (*results == '\n') {
142                     lineCount++;
143                 }
144                 results++;
145             }
146 
147             lineCountString = SSTR(lineCount) + "\n";
148 
149             statWriter->writeData(lineCountString.c_str(), lineCountString.length(), resultReader->getDbKey(id), thread_idx, !tsvOut);
150         }
151     }
152     return 0;
153 }
154 
155 
meanValue()156 int StatsComputer::meanValue() {
157     Debug::Progress progress(resultReader->getSize());
158 
159 #pragma omp parallel
160     {
161         unsigned int thread_idx = 0;
162 #ifdef OPENMP
163         thread_idx = (unsigned int) omp_get_thread_num();
164 #endif
165 #pragma omp for schedule(dynamic, 100)
166         for (size_t id = 0; id < resultReader->getSize(); id++) {
167             progress.updateProgress();
168             char *results = resultReader->getData(id, thread_idx);
169 
170             double meanVal = 0.0;
171             size_t nbSeq = 0;
172             while (*results != '\0') {
173                 char *rest;
174                 errno = 0;
175                 double value = strtod(results, &rest);
176                 if (rest == results || errno != 0) {
177                     Debug(Debug::WARNING) << "Invalid value in entry " << id << "!\n";
178                     continue;
179                 }
180 
181                 meanVal += value;
182                 nbSeq++;
183                 results = Util::skipLine(results);
184             }
185 
186             std::string meanValString = SSTR(meanVal / std::max(static_cast<size_t>(1), nbSeq));
187             meanValString.append("\n");
188             statWriter->writeData(meanValString.c_str(), meanValString.length(), resultReader->getDbKey(id), thread_idx);
189         }
190     };
191     return 0;
192 }
193 
sumValue()194 int StatsComputer::sumValue() {
195     Debug::Progress progress(resultReader->getSize());
196 
197 #pragma omp parallel
198     {
199         unsigned int thread_idx = 0;
200 #ifdef OPENMP
201         thread_idx = (unsigned int) omp_get_thread_num();
202 #endif
203         char buffer[1024];
204 #pragma omp for schedule(dynamic, 10)
205         for (size_t id = 0; id < resultReader->getSize(); id++) {
206             progress.updateProgress();
207             char *results = resultReader->getData(id, thread_idx);
208 
209             size_t sum = 0;
210             while (*results != '\0') {
211                 char *rest;
212                 errno = 0;
213                 size_t value = strtoull(results, &rest, 10);
214                 if (rest == results || errno != 0) {
215                     Debug(Debug::WARNING) << "Invalid value in entry " << id << "!\n";
216                     continue;
217                 }
218 
219                 sum += value;
220                 results = Util::skipLine(results);
221             }
222 
223             std::string meanValString = SSTR(sum) + "\n";
224             char *end = Itoa::u64toa_sse2(sum, buffer);
225             *(end - 1) = '\n';
226 
227             statWriter->writeData(buffer, end - buffer, resultReader->getDbKey(id), thread_idx);
228         }
229     };
230     return 0;
231 }
232 
averageValueOnAminoAcids(const std::unordered_map<char,float> & values,const char * seq)233 float averageValueOnAminoAcids(const std::unordered_map<char, float> &values, const char *seq) {
234     const char *seqPointer = seq;
235     float ret = values.at('0') + values.at('1'); // C ter and N ter values
236     std::unordered_map<char, float>::const_iterator k;
237 
238     while (*seqPointer != '\0' && *seqPointer != '\n') {
239         if ((k = values.find(tolower(*seqPointer))) != values.end()) {
240             ret += k->second;
241         }
242 
243         seqPointer++;
244     }
245 
246     size_t seqLen = seqPointer - seq;
247     return ret / std::max(static_cast<size_t>(1), seqLen);
248 }
249 
250 
doolittle(const char * seq)251 float doolittle(const char *seq) {
252     static Doolittle doolitle;
253     return averageValueOnAminoAcids(doolitle.values, seq);
254 }
255 
256 
charges(const char * seq)257 float charges(const char *seq) {
258     static Charges charges;
259     return averageValueOnAminoAcids(charges.values, seq);
260 }
261 
firstline(const char * seq)262 std::string firstline(const char *seq) {
263     std::stringstream ss(seq);
264     std::string line;
265     std::getline(ss, line);
266     return line;
267 }
268 
269 template<typename T>
sequenceWise(typename PerSequence<T>::type call,bool onlyResultDb)270 int StatsComputer::sequenceWise(typename PerSequence<T>::type call, bool onlyResultDb) {
271     DBReader<unsigned int> *targetReader = NULL;
272     if (!onlyResultDb) {
273         targetReader = new DBReader<unsigned int>(targetDb.c_str(), targetDbIndex.c_str(), threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
274         targetReader->open(DBReader<unsigned int>::NOSORT);
275     }
276     Debug::Progress progress(resultReader->getSize());
277 
278 #pragma omp parallel
279     {
280         unsigned int thread_idx = 0;
281 #ifdef OPENMP
282         thread_idx = (unsigned int) omp_get_thread_num();
283 #endif
284         char dbKey[255 + 1];
285         std::string buffer;
286         buffer.reserve(1024);
287 
288 #pragma omp for schedule(dynamic, 10)
289         for (size_t id = 0; id < resultReader->getSize(); id++) {
290             progress.updateProgress();
291 
292             char *results = resultReader->getData(id, thread_idx);
293             if (onlyResultDb) {
294                 T stat = (*call)(results);
295                 buffer.append(SSTR(stat));
296                 buffer.append("\n");
297             } else {
298                 // for every hit
299                 int cnt = 0;
300                 while (*results != '\0') {
301                     Util::parseKey(results, dbKey);
302                     char *rest;
303                     const unsigned int key = (unsigned int) strtoul(dbKey, &rest, 10);
304                     if ((rest != dbKey && *rest != '\0') || errno == ERANGE) {
305                         Debug(Debug::WARNING) << "Invalid key in entry " << id << "!\n";
306                         continue;
307                     }
308 
309                     const size_t edgeId = targetReader->getId(key);
310                     const char *dbSeqData = targetReader->getData(edgeId, thread_idx);
311 
312                     T stat = (*call)(dbSeqData);
313                     buffer.append(SSTR(stat));
314                     buffer.append("\n");
315 
316                     results = Util::skipLine(results);
317                     cnt++;
318                 }
319             }
320             statWriter->writeData(buffer.c_str(), buffer.length(), resultReader->getDbKey(id), thread_idx);
321             buffer.clear();
322         }
323     };
324 
325     if(!onlyResultDb) {
326         targetReader->close();
327         delete targetReader;
328     }
329 
330     return 0;
331 }
332 
result2stats(int argc,const char ** argv,const Command & command)333 int result2stats(int argc, const char **argv, const Command &command) {
334     Parameters &par = Parameters::getInstance();
335     par.parseParameters(argc, argv, command, true, 0, 0);
336 
337     StatsComputer computeStats(par);
338     return computeStats.run();
339 }
340