1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #ifndef FS_BUFFER_HPP
18 #define FS_BUFFER_HPP
19 
20 #include <ndb_global.h>
21 
22 #define DEBUG(x)
23 
24 /**
25  * A circular data buffer to be used together with the FS
26  *
27  * One writer - Typically your block
28  *   getWritePtr()
29  *   updateWritePtr()
30  *
31  * One reader - Typically "thread" in your block sending stuff to NDBFS
32  *   getReadPtr()
33  *   updateReadPtr()
34  */
35 class FsBuffer {
36 public:
37   /**
38    * Default constructor
39    */
40   FsBuffer();
41 
42   /**
43    * setup FsBuffer
44    *
45    * @param Buffer    - Ptr to continuous memory
46    * @param Size      - Buffer size in 32-bit words
47    * @param BlockSize - Size of block in 32-bit words
48    * @param MinRead   - Min read size in 32-bit words
49    *                    Get rounded(down) to nearest multiple of block size.
50    * @param MaxRead   - Max read size in 32-bit words
51    *                    Get rounded(down) to nearest multiple of block size.
52    * @param MaxWrite  - Maximum write (into buffer) in 32-bit words
53    *
54    * @return NULL if everything is OK
55    *    else A string describing problem
56    */
57   const char * setup(Uint32 * Buffer,
58 		     Uint32 Size,
59 		     Uint32 BlockSize = 128,   // 512 bytes
60 		     Uint32 MinRead   = 1024,  // 4k
61 		     Uint32 MaxRead   = 1024,  // 4k
62 		     Uint32 MaxWrite  = 1024); // 4k
63   /*
64    * @return NULL if everything is OK
65    *    else A string describing problem
66    */
67   const char * valid() const;
68 
69   Uint32 getBufferSize() const;
70   Uint32 getUsableSize() const;
71   Uint32 * getStart() const;
72 
73   /**
74    * getReadPtr - Get pointer and size of data to send to FS
75    *
76    * @param ptr - Where to fetch data
77    * @param sz  - How much data in 32-bit words
78    * @param eof - Is this the last fetch (only if return false)
79    *
80    * @return true  - If there is data of size >= minread
81    *         false - If there is can be data be if it is is < minread
82    *               - else eof = true
83    */
84   bool getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * eof);
85 
86   /**
87    * @note: sz must be equal to sz returned by getReadPtr
88    */
89   void updateReadPtr(Uint32 sz);
90 
91   /**
92    *
93    * @note Must be followed by a updateWritePtr(no of words used)
94    */
95   bool getWritePtr(Uint32 ** ptr, Uint32 sz);
96 
97   void updateWritePtr(Uint32 sz);
98 
99   /**
100    * There will be no more writing to this buffer
101    */
102   void eof();
103 
104   /**
105    * Getters for varibles
106    */
getMaxWrite() const107   Uint32 getMaxWrite() const { return m_maxWrite;}
getMinRead() const108   Uint32 getMinRead() const { return m_minRead;}
109 
getFreeSize() const110   Uint32 getFreeSize() const { return m_free; }
111 
112   /**
113    * reset
114    */
115   void reset();
116 
117 private:
118 
119   Uint32 m_free;
120   Uint32 m_readIndex;
121   Uint32 m_writeIndex;
122   Uint32 m_eof;
123   Uint32 * m_start;
124   Uint32 m_minRead;
125   Uint32 m_maxRead;
126   Uint32 m_maxWrite;
127   Uint32 m_size;
128 
129   Uint32 * m_buffer;
130   Uint32 m_bufSize;
131   Uint32 m_blockSize;
132 
133   void clear();
134 };
135 
136 inline
FsBuffer()137 FsBuffer::FsBuffer()
138 {
139   clear();
140 }
141 
142 inline
143 void
clear()144 FsBuffer::clear(){
145   m_minRead = m_maxRead = m_maxWrite = m_size = m_bufSize = m_free = 0;
146   m_buffer = m_start = 0;
147 }
148 
149 static
150 Uint32 *
align(Uint32 * ptr,Uint32 alignment,bool downwards)151 align(Uint32 * ptr, Uint32 alignment, bool downwards){
152 
153   const UintPtr a = (UintPtr)ptr;
154   const UintPtr b = a % alignment;
155 
156   if(downwards){
157     return (Uint32 *)(a - b);
158   } else {
159     return (Uint32 *)(a + (b == 0 ? 0 : (alignment - b)));
160   }
161 }
162 
163 inline
164 const char *
setup(Uint32 * Buffer,Uint32 Size,Uint32 Block,Uint32 MinRead,Uint32 MaxRead,Uint32 MaxWrite)165 FsBuffer::setup(Uint32 * Buffer,
166 		Uint32 Size,
167 		Uint32 Block,
168 		Uint32 MinRead,
169 		Uint32 MaxRead,
170 		Uint32 MaxWrite)
171 {
172   clear();
173   m_buffer    = Buffer;
174   m_bufSize   = Size;
175   m_blockSize = Block;
176   if(Block == 0){
177     return valid();
178   }
179 
180   m_minRead  = (MinRead / Block) * Block;
181   m_maxRead  = (MaxRead / Block) * Block;
182   m_maxWrite = MaxWrite;
183 
184   m_start = align(Buffer, Block*4, false);
185   Uint32 * stop = align(Buffer + Size - MaxWrite, Block*4, true);
186   if(stop > m_start){
187     m_size = stop - m_start;
188   } else {
189     m_size = 0;
190   }
191 
192   if(m_minRead == 0)
193     m_size = 0;
194   else
195     m_size = (m_size / m_minRead) * m_minRead;
196 
197 #if 0
198   ndbout_c("Block = %d MinRead = %d -> %d", Block*4, MinRead*4, m_minRead*4);
199   ndbout_c("Block = %d MaxRead = %d -> %d", Block*4, MaxRead*4, m_maxRead*4);
200 
201   ndbout_c("Buffer = %d -> %d", Buffer, m_start);
202   ndbout_c("Buffer = %d Size = %d MaxWrite = %d -> %d",
203 	   Buffer, Size*4, MaxWrite*4, m_size*4);
204 #endif
205 
206   m_readIndex = m_writeIndex = m_eof = 0;
207   m_free = m_size;
208   return valid();
209 }
210 
211 inline
212 void
reset()213 FsBuffer::reset()
214 {
215   m_readIndex = m_writeIndex = 0;
216   m_free = m_size;
217   m_eof = 0;
218 }
219 
220 inline
221 const char *
valid() const222 FsBuffer::valid() const {
223   if(m_buffer  == 0) return "Null pointer buffer";
224   if(m_bufSize == 0) return "Zero size buffer";
225   if(m_blockSize == 0) return "Zero block size";
226   if(m_minRead < m_blockSize) return "Min read less than block size";
227   if(m_maxRead < m_blockSize) return "Max read less than block size";
228   if(m_maxRead < m_minRead) return "Max read less than min read";
229   if(m_size == 0) return "Zero usable space";
230   return 0;
231 }
232 
233 inline
234 Uint32
getBufferSize() const235 FsBuffer::getBufferSize() const {
236   return m_bufSize;
237 }
238 
239 inline
240 Uint32
getUsableSize() const241 FsBuffer::getUsableSize() const {
242   return m_size;
243 }
244 
245 inline
246 Uint32 *
getStart() const247 FsBuffer::getStart() const {
248   return m_start;
249 }
250 
251 inline
252 bool
getReadPtr(Uint32 ** ptr,Uint32 * sz,bool * _eof)253 FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
254 
255   Uint32 * Tp = m_start;
256   const Uint32 Tr = m_readIndex;
257   const Uint32 Tm = m_minRead;
258   const Uint32 Ts = m_size;
259   const Uint32 Tmw = m_maxRead;
260 
261   Uint32 sz1 = m_size - m_free; // Used
262 
263   if(sz1 >= Tm){
264     if(Tr + sz1 > Ts)
265       sz1 = (Ts - Tr);
266 
267     if(sz1 > Tmw)
268       * sz = Tmw;
269     else
270       * sz = sz1 - (sz1 % Tm);
271 
272     * ptr = &Tp[Tr];
273 
274     DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d",
275 		   Tr, Tmw, Ts, Tm, sz1, * sz));
276 
277     return true;
278   }
279 
280   if(!m_eof){
281     * _eof = false;
282 
283     DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> false",
284 		   Tr, Tmw, Ts, Tm, sz1));
285 
286     return false;
287   }
288 
289   * sz = sz1;
290   * _eof = true;
291   * ptr = &Tp[Tr];
292 
293   DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d eof",
294 		 Tr, Tmw, Ts, Tm, sz1, * sz));
295 
296   return false;
297 }
298 
299 inline
300 void
updateReadPtr(Uint32 sz)301 FsBuffer::updateReadPtr(Uint32 sz){
302   const Uint32 Tr = m_readIndex;
303   const Uint32 Ts = m_size;
304 
305   m_free += sz;
306   m_readIndex = (Tr + sz) % Ts;
307 }
308 
309 inline
310 bool
getWritePtr(Uint32 ** ptr,Uint32 sz)311 FsBuffer::getWritePtr(Uint32 ** ptr, Uint32 sz){
312   assert(sz <= m_maxWrite);
313   Uint32 * Tp = m_start;
314   const Uint32 Tw = m_writeIndex;
315   const Uint32 sz1 = m_free;
316 
317   if(sz1 > sz){ // Note at least 1 word of slack
318     * ptr = &Tp[Tw];
319 
320     DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> true",
321 		   sz, Tw, sz1));
322     return true;
323   }
324 
325   DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> false",
326 		 sz, Tw, sz1));
327 
328   return false;
329 }
330 
331 inline
332 void
updateWritePtr(Uint32 sz)333 FsBuffer::updateWritePtr(Uint32 sz){
334   assert(sz <= m_maxWrite);
335   Uint32 * Tp = m_start;
336   const Uint32 Tw = m_writeIndex;
337   const Uint32 Ts = m_size;
338 
339   const Uint32 Tnew = (Tw + sz);
340   m_free -= sz;
341   if(Tnew < Ts){
342     m_writeIndex = Tnew;
343     DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
344                    sz, m_writeIndex));
345     return;
346   }
347 
348   memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2);
349   m_writeIndex = Tnew - Ts;
350   DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
351                  sz, m_writeIndex));
352 }
353 
354 inline
355 void
eof()356 FsBuffer::eof(){
357   m_eof = 1;
358 }
359 
360 #endif
361