1 /* 2 * Copyright 2011, Ben Langmead <langmea@cs.jhu.edu> 3 * 4 * This file is part of Bowtie 2. 5 * 6 * Bowtie 2 is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * Bowtie 2 is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with Bowtie 2. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #ifndef OUTQ_H_ 21 #define OUTQ_H_ 22 23 #include "assert_helpers.h" 24 #include "ds.h" 25 #include "sstring.h" 26 #include "read.h" 27 #include "threading.h" 28 #include "mem_ids.h" 29 #include <vector> 30 31 /** 32 * Encapsulates a list of lines of output. If the earliest as-yet-unreported 33 * read has id N and Bowtie 2 wants to write a record for read with id N+1, we 34 * resize the lines_ and committed_ lists to have at least 2 elements (1 for N, 35 * 1 for N+1) and return the BTString * associated with the 2nd element. When 36 * the user calls commit() for the read with id N, 37 */ 38 class OutputQueue { 39 40 static const size_t NFLUSH_THRESH = 8; 41 42 public: 43 44 OutputQueue( 45 OutFileBuf& obuf, 46 bool reorder, 47 size_t nthreads, 48 bool threadSafe, 49 int perThreadBufSize, 50 TReadId rdid = 0) : obuf_(obuf)51 obuf_(obuf), 52 cur_(rdid), 53 nfinished_(0), 54 nflushed_(0), 55 lines_(RES_CAT), 56 started_(RES_CAT), 57 finished_(RES_CAT), 58 reorder_(reorder), 59 threadSafe_(threadSafe), 60 mutex_m(), 61 nthreads_(nthreads), 62 perThreadBuf(NULL), 63 perThreadCounter(NULL), 64 perThreadBufSize_(perThreadBufSize) 65 { 66 nstarted_=0; 67 assert(nthreads_ <= 2 || threadSafe); 68 if(!reorder) 69 { 70 perThreadBuf = new BTString*[nthreads_]; 71 perThreadCounter = new int[nthreads_]; 72 size_t i = 0; 73 for(i=0;i<nthreads_;i++) 74 { 75 perThreadBuf[i] = new BTString[perThreadBufSize_]; 76 perThreadCounter[i] = 0; 77 } 78 } 79 } 80 ~OutputQueue()81 ~OutputQueue() { 82 if(perThreadBuf != NULL) { 83 for (size_t i = 0; i < nthreads_; i++) { 84 delete[] perThreadBuf[i]; 85 } 86 delete[] perThreadBuf; 87 delete[] perThreadCounter; 88 } 89 } 90 91 /** 92 * Caller is telling us that they're about to write output record(s) for 93 * the read with the given id. 94 */ 95 void beginRead(TReadId rdid, size_t threadId); 96 97 /** 98 * Writer is finished writing to 99 */ 100 void finishRead(const BTString& rec, TReadId rdid, size_t threadId); 101 102 /** 103 * Return the number of records currently being buffered. 104 */ size()105 size_t size() const { 106 return lines_.size(); 107 } 108 109 /** 110 * Return the number of records that have been flushed so far. 111 */ numFlushed()112 TReadId numFlushed() const { 113 return nflushed_; 114 } 115 116 /** 117 * Return the number of records that have been started so far. 118 */ numStarted()119 TReadId numStarted() const { 120 return nstarted_; 121 } 122 123 /** 124 * Return the number of records that have been finished so far. 125 */ numFinished()126 TReadId numFinished() const { 127 return nfinished_; 128 } 129 130 /** 131 * Write already-committed lines starting from cur_. 132 */ 133 void flush(bool force = false, bool getLock = true); 134 135 protected: 136 137 OutFileBuf& obuf_; 138 TReadId cur_; 139 std::atomic<TReadId> nstarted_; 140 TReadId nfinished_; 141 TReadId nflushed_; 142 EList<BTString> lines_; 143 EList<bool> started_; 144 EList<bool> finished_; 145 bool reorder_; 146 bool threadSafe_; 147 MUTEX_T mutex_m; 148 149 // used for output read buffer 150 size_t nthreads_; 151 BTString** perThreadBuf; 152 int* perThreadCounter; 153 int perThreadBufSize_; 154 155 private: 156 157 void flushImpl(bool force); 158 void beginReadImpl(TReadId rdid, size_t threadId); 159 void finishReadImpl(const BTString& rec, TReadId rdid, size_t threadId); 160 }; 161 162 class OutputQueueMark { 163 public: OutputQueueMark(OutputQueue & q,const BTString & rec,TReadId rdid,size_t threadId)164 OutputQueueMark( 165 OutputQueue& q, 166 const BTString& rec, 167 TReadId rdid, 168 size_t threadId) : 169 q_(q), 170 rec_(rec), 171 rdid_(rdid), 172 threadId_(threadId) 173 { 174 q_.beginRead(rdid, threadId); 175 } 176 ~OutputQueueMark()177 ~OutputQueueMark() { 178 q_.finishRead(rec_, rdid_, threadId_); 179 } 180 181 protected: 182 OutputQueue& q_; 183 const BTString& rec_; 184 TReadId rdid_; 185 size_t threadId_; 186 }; 187 188 #endif 189