1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #ifndef FS_BUFFER_HPP
18 #define FS_BUFFER_HPP
19
20 #include <ndb_global.h>
21
22 #define DEBUG(x)
23
24 /**
25 * A circular data buffer to be used together with the FS
26 *
27 * One writer - Typically your block
28 * getWritePtr()
29 * updateWritePtr()
30 *
31 * One reader - Typically "thread" in your block sending stuff to NDBFS
32 * getReadPtr()
33 * updateReadPtr()
34 */
35 class FsBuffer {
36 public:
37 /**
38 * Default constructor
39 */
40 FsBuffer();
41
42 /**
43 * setup FsBuffer
44 *
45 * @param Buffer - Ptr to continuous memory
46 * @param Size - Buffer size in 32-bit words
47 * @param BlockSize - Size of block in 32-bit words
48 * @param MinRead - Min read size in 32-bit words
49 * Get rounded(down) to nearest multiple of block size.
50 * @param MaxRead - Max read size in 32-bit words
51 * Get rounded(down) to nearest multiple of block size.
52 * @param MaxWrite - Maximum write (into buffer) in 32-bit words
53 *
54 * @return NULL if everything is OK
55 * else A string describing problem
56 */
57 const char * setup(Uint32 * Buffer,
58 Uint32 Size,
59 Uint32 BlockSize = 128, // 512 bytes
60 Uint32 MinRead = 1024, // 4k
61 Uint32 MaxRead = 1024, // 4k
62 Uint32 MaxWrite = 1024); // 4k
63 /*
64 * @return NULL if everything is OK
65 * else A string describing problem
66 */
67 const char * valid() const;
68
69 Uint32 getBufferSize() const;
70 Uint32 getUsableSize() const;
71 Uint32 * getStart() const;
72
73 /**
74 * getReadPtr - Get pointer and size of data to send to FS
75 *
76 * @param ptr - Where to fetch data
77 * @param sz - How much data in 32-bit words
78 * @param eof - Is this the last fetch (only if return false)
79 *
80 * @return true - If there is data of size >= minread
81 * false - If there is can be data be if it is is < minread
82 * - else eof = true
83 */
84 bool getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * eof);
85
86 /**
87 * @note: sz must be equal to sz returned by getReadPtr
88 */
89 void updateReadPtr(Uint32 sz);
90
91 /**
92 *
93 * @note Must be followed by a updateWritePtr(no of words used)
94 */
95 bool getWritePtr(Uint32 ** ptr, Uint32 sz);
96
97 void updateWritePtr(Uint32 sz);
98
99 /**
100 * There will be no more writing to this buffer
101 */
102 void eof();
103
104 /**
105 * Getters for varibles
106 */
getMaxWrite() const107 Uint32 getMaxWrite() const { return m_maxWrite;}
getMinRead() const108 Uint32 getMinRead() const { return m_minRead;}
109
getFreeSize() const110 Uint32 getFreeSize() const { return m_free; }
111
112 /**
113 * reset
114 */
115 void reset();
116
117 private:
118
119 Uint32 m_free;
120 Uint32 m_readIndex;
121 Uint32 m_writeIndex;
122 Uint32 m_eof;
123 Uint32 * m_start;
124 Uint32 m_minRead;
125 Uint32 m_maxRead;
126 Uint32 m_maxWrite;
127 Uint32 m_size;
128
129 Uint32 * m_buffer;
130 Uint32 m_bufSize;
131 Uint32 m_blockSize;
132
133 void clear();
134 };
135
136 inline
FsBuffer()137 FsBuffer::FsBuffer()
138 {
139 clear();
140 }
141
142 inline
143 void
clear()144 FsBuffer::clear(){
145 m_minRead = m_maxRead = m_maxWrite = m_size = m_bufSize = m_free = 0;
146 m_buffer = m_start = 0;
147 }
148
149 static
150 Uint32 *
align(Uint32 * ptr,Uint32 alignment,bool downwards)151 align(Uint32 * ptr, Uint32 alignment, bool downwards){
152
153 const UintPtr a = (UintPtr)ptr;
154 const UintPtr b = a % alignment;
155
156 if(downwards){
157 return (Uint32 *)(a - b);
158 } else {
159 return (Uint32 *)(a + (b == 0 ? 0 : (alignment - b)));
160 }
161 }
162
163 inline
164 const char *
setup(Uint32 * Buffer,Uint32 Size,Uint32 Block,Uint32 MinRead,Uint32 MaxRead,Uint32 MaxWrite)165 FsBuffer::setup(Uint32 * Buffer,
166 Uint32 Size,
167 Uint32 Block,
168 Uint32 MinRead,
169 Uint32 MaxRead,
170 Uint32 MaxWrite)
171 {
172 clear();
173 m_buffer = Buffer;
174 m_bufSize = Size;
175 m_blockSize = Block;
176 if(Block == 0){
177 return valid();
178 }
179
180 m_minRead = (MinRead / Block) * Block;
181 m_maxRead = (MaxRead / Block) * Block;
182 m_maxWrite = MaxWrite;
183
184 m_start = align(Buffer, Block*4, false);
185 Uint32 * stop = align(Buffer + Size - MaxWrite, Block*4, true);
186 if(stop > m_start){
187 m_size = stop - m_start;
188 } else {
189 m_size = 0;
190 }
191
192 if(m_minRead == 0)
193 m_size = 0;
194 else
195 m_size = (m_size / m_minRead) * m_minRead;
196
197 #if 0
198 ndbout_c("Block = %d MinRead = %d -> %d", Block*4, MinRead*4, m_minRead*4);
199 ndbout_c("Block = %d MaxRead = %d -> %d", Block*4, MaxRead*4, m_maxRead*4);
200
201 ndbout_c("Buffer = %d -> %d", Buffer, m_start);
202 ndbout_c("Buffer = %d Size = %d MaxWrite = %d -> %d",
203 Buffer, Size*4, MaxWrite*4, m_size*4);
204 #endif
205
206 m_readIndex = m_writeIndex = m_eof = 0;
207 m_free = m_size;
208 return valid();
209 }
210
211 inline
212 void
reset()213 FsBuffer::reset()
214 {
215 m_readIndex = m_writeIndex = 0;
216 m_free = m_size;
217 m_eof = 0;
218 }
219
220 inline
221 const char *
valid() const222 FsBuffer::valid() const {
223 if(m_buffer == 0) return "Null pointer buffer";
224 if(m_bufSize == 0) return "Zero size buffer";
225 if(m_blockSize == 0) return "Zero block size";
226 if(m_minRead < m_blockSize) return "Min read less than block size";
227 if(m_maxRead < m_blockSize) return "Max read less than block size";
228 if(m_maxRead < m_minRead) return "Max read less than min read";
229 if(m_size == 0) return "Zero usable space";
230 return 0;
231 }
232
233 inline
234 Uint32
getBufferSize() const235 FsBuffer::getBufferSize() const {
236 return m_bufSize;
237 }
238
239 inline
240 Uint32
getUsableSize() const241 FsBuffer::getUsableSize() const {
242 return m_size;
243 }
244
245 inline
246 Uint32 *
getStart() const247 FsBuffer::getStart() const {
248 return m_start;
249 }
250
251 inline
252 bool
getReadPtr(Uint32 ** ptr,Uint32 * sz,bool * _eof)253 FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
254
255 Uint32 * Tp = m_start;
256 const Uint32 Tr = m_readIndex;
257 const Uint32 Tm = m_minRead;
258 const Uint32 Ts = m_size;
259 const Uint32 Tmw = m_maxRead;
260
261 Uint32 sz1 = m_size - m_free; // Used
262
263 if(sz1 >= Tm){
264 if(Tr + sz1 > Ts)
265 sz1 = (Ts - Tr);
266
267 if(sz1 > Tmw)
268 * sz = Tmw;
269 else
270 * sz = sz1 - (sz1 % Tm);
271
272 * ptr = &Tp[Tr];
273
274 DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d",
275 Tr, Tmw, Ts, Tm, sz1, * sz));
276
277 return true;
278 }
279
280 if(!m_eof){
281 * _eof = false;
282
283 DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> false",
284 Tr, Tmw, Ts, Tm, sz1));
285
286 return false;
287 }
288
289 * sz = sz1;
290 * _eof = true;
291 * ptr = &Tp[Tr];
292
293 DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d eof",
294 Tr, Tmw, Ts, Tm, sz1, * sz));
295
296 return false;
297 }
298
299 inline
300 void
updateReadPtr(Uint32 sz)301 FsBuffer::updateReadPtr(Uint32 sz){
302 const Uint32 Tr = m_readIndex;
303 const Uint32 Ts = m_size;
304
305 m_free += sz;
306 m_readIndex = (Tr + sz) % Ts;
307 }
308
309 inline
310 bool
getWritePtr(Uint32 ** ptr,Uint32 sz)311 FsBuffer::getWritePtr(Uint32 ** ptr, Uint32 sz){
312 assert(sz <= m_maxWrite);
313 Uint32 * Tp = m_start;
314 const Uint32 Tw = m_writeIndex;
315 const Uint32 sz1 = m_free;
316
317 if(sz1 > sz){ // Note at least 1 word of slack
318 * ptr = &Tp[Tw];
319
320 DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> true",
321 sz, Tw, sz1));
322 return true;
323 }
324
325 DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> false",
326 sz, Tw, sz1));
327
328 return false;
329 }
330
331 inline
332 void
updateWritePtr(Uint32 sz)333 FsBuffer::updateWritePtr(Uint32 sz){
334 assert(sz <= m_maxWrite);
335 Uint32 * Tp = m_start;
336 const Uint32 Tw = m_writeIndex;
337 const Uint32 Ts = m_size;
338
339 const Uint32 Tnew = (Tw + sz);
340 m_free -= sz;
341 if(Tnew < Ts){
342 m_writeIndex = Tnew;
343 DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
344 sz, m_writeIndex));
345 return;
346 }
347
348 memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2);
349 m_writeIndex = Tnew - Ts;
350 DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
351 sz, m_writeIndex));
352 }
353
354 inline
355 void
eof()356 FsBuffer::eof(){
357 m_eof = 1;
358 }
359
360 #endif
361