1 /*
2  * Copyright 2011, Ben Langmead <langmea@cs.jhu.edu>
3  *
4  * This file is part of Bowtie 2.
5  *
6  * Bowtie 2 is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * Bowtie 2 is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Bowtie 2.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef OUTQ_H_
21 #define OUTQ_H_
22 
23 #include "assert_helpers.h"
24 #include "ds.h"
25 #include "sstring.h"
26 #include "read.h"
27 #include "threading.h"
28 #include "mem_ids.h"
29 #include <vector>
30 
31 /**
32  * Encapsulates a list of lines of output.  If the earliest as-yet-unreported
33  * read has id N and Bowtie 2 wants to write a record for read with id N+1, we
34  * resize the lines_ and committed_ lists to have at least 2 elements (1 for N,
35  * 1 for N+1) and return the BTString * associated with the 2nd element.  When
36  * the user calls commit() for the read with id N,
37  */
38 class OutputQueue {
39 
40 	static const size_t NFLUSH_THRESH = 8;
41 
42 public:
43 
44 	OutputQueue(
45 		OutFileBuf& obuf,
46 		bool reorder,
47 		size_t nthreads,
48 		bool threadSafe,
49 		int perThreadBufSize,
50 		TReadId rdid = 0) :
obuf_(obuf)51 		obuf_(obuf),
52 		cur_(rdid),
53 		nfinished_(0),
54 		nflushed_(0),
55 		lines_(RES_CAT),
56 		started_(RES_CAT),
57 		finished_(RES_CAT),
58 		reorder_(reorder),
59 		threadSafe_(threadSafe),
60 		mutex_m(),
61 		nthreads_(nthreads),
62 		perThreadBuf(NULL),
63 		perThreadCounter(NULL),
64 		perThreadBufSize_(perThreadBufSize)
65 	{
66 		nstarted_=0;
67 		assert(nthreads_ <= 2 || threadSafe);
68 		if(!reorder)
69 		{
70 			perThreadBuf = new BTString*[nthreads_];
71 			perThreadCounter = new int[nthreads_];
72 			size_t i = 0;
73 			for(i=0;i<nthreads_;i++)
74 			{
75 				perThreadBuf[i] = new BTString[perThreadBufSize_];
76 				perThreadCounter[i] = 0;
77 			}
78 		}
79 	}
80 
~OutputQueue()81 	~OutputQueue() {
82 		if(perThreadBuf != NULL) {
83 			for (size_t i = 0; i < nthreads_; i++) {
84 				delete[] perThreadBuf[i];
85 			}
86 			delete[] perThreadBuf;
87 			delete[] perThreadCounter;
88 		}
89 	}
90 
91 	/**
92 	 * Caller is telling us that they're about to write output record(s) for
93 	 * the read with the given id.
94 	 */
95 	void beginRead(TReadId rdid, size_t threadId);
96 
97 	/**
98 	 * Writer is finished writing to
99 	 */
100 	void finishRead(const BTString& rec, TReadId rdid, size_t threadId);
101 
102 	/**
103 	 * Return the number of records currently being buffered.
104 	 */
size()105 	size_t size() const {
106 		return lines_.size();
107 	}
108 
109 	/**
110 	 * Return the number of records that have been flushed so far.
111 	 */
numFlushed()112 	TReadId numFlushed() const {
113 		return nflushed_;
114 	}
115 
116 	/**
117 	 * Return the number of records that have been started so far.
118 	 */
numStarted()119 	TReadId numStarted() const {
120 		return nstarted_;
121 	}
122 
123 	/**
124 	 * Return the number of records that have been finished so far.
125 	 */
numFinished()126 	TReadId numFinished() const {
127 		return nfinished_;
128 	}
129 
130 	/**
131 	 * Write already-committed lines starting from cur_.
132 	 */
133 	void flush(bool force = false, bool getLock = true);
134 
135 protected:
136 
137 	OutFileBuf&     obuf_;
138 	TReadId         cur_;
139 	std::atomic<TReadId> nstarted_;
140 	TReadId         nfinished_;
141 	TReadId         nflushed_;
142 	EList<BTString> lines_;
143 	EList<bool>     started_;
144 	EList<bool>     finished_;
145 	bool            reorder_;
146 	bool            threadSafe_;
147 	MUTEX_T         mutex_m;
148 
149 	// used for output read buffer
150 	size_t nthreads_;
151 	BTString** perThreadBuf;
152 	int* 		perThreadCounter;
153 	int perThreadBufSize_;
154 
155 private:
156 
157 	void flushImpl(bool force);
158 	void beginReadImpl(TReadId rdid, size_t threadId);
159 	void finishReadImpl(const BTString& rec, TReadId rdid, size_t threadId);
160 };
161 
162 class OutputQueueMark {
163 public:
OutputQueueMark(OutputQueue & q,const BTString & rec,TReadId rdid,size_t threadId)164 	OutputQueueMark(
165 		OutputQueue& q,
166 		const BTString& rec,
167 		TReadId rdid,
168 		size_t threadId) :
169 		q_(q),
170 		rec_(rec),
171 		rdid_(rdid),
172 		threadId_(threadId)
173 	{
174 		q_.beginRead(rdid, threadId);
175 	}
176 
~OutputQueueMark()177 	~OutputQueueMark() {
178 		q_.finishRead(rec_, rdid_, threadId_);
179 	}
180 
181 protected:
182 	OutputQueue& q_;
183 	const BTString& rec_;
184 	TReadId rdid_;
185 	size_t threadId_;
186 };
187 
188 #endif
189