1 //
2 // FIFOBuffer.h
3 //
4 // Library: Foundation
5 // Package: Core
6 // Module:  FIFOBuffer
7 //
8 // Definition of the FIFOBuffer class.
9 //
10 // Copyright (c) 2006, Applied Informatics Software Engineering GmbH.
11 // and Contributors.
12 //
13 // SPDX-License-Identifier:	BSL-1.0
14 //
15 
16 
17 #ifndef Foundation_FIFOBuffer_INCLUDED
18 #define Foundation_FIFOBuffer_INCLUDED
19 
20 
21 #include "Poco/Foundation.h"
22 #include "Poco/Exception.h"
23 #include "Poco/Buffer.h"
24 #include "Poco/BasicEvent.h"
25 #include "Poco/Mutex.h"
26 #include "Poco/Format.h"
27 
28 
29 namespace Poco {
30 
31 
32 template <class T>
33 class BasicFIFOBuffer
34 	/// A simple buffer class with support for re-entrant,
35 	/// FIFO-style read/write operations, as well as (optional)
36 	/// empty/non-empty/full (i.e. writable/readable) transition
37 	/// notifications. Buffer can be flagged with end-of-file and
38 	/// error flags, which renders it un-readable/writable.
39 	///
40 	/// Critical portions of code are protected by a recursive mutex.
41 	/// However, to achieve thread-safety in cases where multiple
42 	/// member function calls are involved and have to be atomic,
43 	/// the mutex must be locked externally.
44 	///
45 	/// Buffer size, as well as amount of unread data and
46 	/// available space introspections are supported as well.
47 	///
48 	/// This class is useful anywhere where a FIFO functionality
49 	/// is needed.
50 {
51 public:
52 	typedef T Type;
53 
54 	mutable Poco::BasicEvent<bool> writable;
55 		/// Event indicating "writability" of the buffer,
56 		/// triggered as follows:
57 		///
58 		///	* when buffer transitions from non-full to full,
59 		///	  Writable event observers are notified, with
60 		///	  false value as the argument
61 		///
62 		///	* when buffer transitions from full to non-full,
63 		///	  Writable event observers are notified, with
64 		///	  true value as the argument
65 
66 	mutable Poco::BasicEvent<bool> readable;
67 		/// Event indicating "readability" of the buffer,
68 		/// triggered as follows:
69 		///
70 		///	* when buffer transitions from non-empty to empty,
71 		///	  Readable event observers are notified, with false
72 		///	  value as the argument
73 		///
74 		///	* when FIFOBuffer transitions from empty to non-empty,
75 		///	  Readable event observers are notified, with true value
76 		///	  as the argument
77 
78 	BasicFIFOBuffer(std::size_t size, bool notify = false):
_buffer(size)79 		_buffer(size),
80 		_begin(0),
81 		_used(0),
82 		_notify(notify),
83 		_eof(false),
84 		_error(false)
85 		/// Creates the FIFOBuffer.
86 	{
87 	}
88 
89 	BasicFIFOBuffer(T* pBuffer, std::size_t size, bool notify = false):
_buffer(pBuffer,size)90 		_buffer(pBuffer, size),
91 		_begin(0),
92 		_used(0),
93 		_notify(notify),
94 		_eof(false),
95 		_error(false)
96 		/// Creates the FIFOBuffer.
97 	{
98 	}
99 
100 	BasicFIFOBuffer(const T* pBuffer, std::size_t size, bool notify = false):
_buffer(pBuffer,size)101 		_buffer(pBuffer, size),
102 		_begin(0),
103 		_used(size),
104 		_notify(notify),
105 		_eof(false),
106 		_error(false)
107 		/// Creates the FIFOBuffer.
108 	{
109 	}
110 
~BasicFIFOBuffer()111 	~BasicFIFOBuffer()
112 		/// Destroys the FIFOBuffer.
113 	{
114 	}
115 
116 	void resize(std::size_t newSize, bool preserveContent = true)
117 		/// Resizes the buffer. If preserveContent is true,
118 		/// the content of the old buffer is preserved.
119 		/// New size can be larger or smaller than
120 		/// the current size, but it must not be 0.
121 		/// Additionally, if the new length is smaller
122 		/// than currently used length and preserveContent
123 		/// is true, InvalidAccessException is thrown.
124 	{
125 		Mutex::ScopedLock lock(_mutex);
126 
127 		if (preserveContent && (newSize < _used))
128 			throw InvalidAccessException("Can not resize FIFO without data loss.");
129 
130 		std::size_t usedBefore = _used;
131 		_buffer.resize(newSize, preserveContent);
132 		if (!preserveContent) _used = 0;
133 		if (_notify) notify(usedBefore);
134 	}
135 
peek(T * pBuffer,std::size_t length)136 	std::size_t peek(T* pBuffer, std::size_t length) const
137 		/// Peeks into the data currently in the FIFO
138 		/// without actually extracting it.
139 		/// If length is zero, the return is immediate.
140 		/// If length is greater than used length,
141 		/// it is substituted with the the current FIFO
142 		/// used length.
143 		///
144 		/// Returns the number of elements copied in the
145 		/// supplied buffer.
146 	{
147 		if (0 == length) return 0;
148 		Mutex::ScopedLock lock(_mutex);
149 		if (!isReadable()) return 0;
150 		if (length > _used) length = _used;
151 		std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T));
152 		return length;
153 	}
154 
155 	std::size_t peek(Poco::Buffer<T>& buffer, std::size_t length = 0) const
156 		/// Peeks into the data currently in the FIFO
157 		/// without actually extracting it.
158 		/// Resizes the supplied buffer to the size of
159 		/// data written to it. If length is not
160 		/// supplied by the caller or is greater than length
161 		/// of currently used data, the current FIFO used
162 		/// data length is substituted for it.
163 		///
164 		/// Returns the number of elements copied in the
165 		/// supplied buffer.
166 	{
167 		Mutex::ScopedLock lock(_mutex);
168 		if (!isReadable()) return 0;
169 		if (0 == length || length > _used) length = _used;
170 		buffer.resize(length);
171 		return peek(buffer.begin(), length);
172 	}
173 
read(T * pBuffer,std::size_t length)174 	std::size_t read(T* pBuffer, std::size_t length)
175 		/// Copies the data currently in the FIFO
176 		/// into the supplied buffer, which must be
177 		/// preallocated to at least the length size
178 		/// before calling this function.
179 		///
180 		/// Returns the size of the copied data.
181 	{
182 		if (0 == length) return 0;
183 		Mutex::ScopedLock lock(_mutex);
184 		if (!isReadable()) return 0;
185 		std::size_t usedBefore = _used;
186 		std::size_t readLen = peek(pBuffer, length);
187 		poco_assert (_used >= readLen);
188 		_used -= readLen;
189 		if (0 == _used) _begin = 0;
190 		else _begin += length;
191 
192 		if (_notify) notify(usedBefore);
193 
194 		return readLen;
195 	}
196 
197 	std::size_t read(Poco::Buffer<T>& buffer, std::size_t length = 0)
198 		/// Copies the data currently in the FIFO
199 		/// into the supplied buffer.
200 		/// Resizes the supplied buffer to the size of
201 		/// data written to it.
202 		///
203 		/// Returns the size of the copied data.
204 	{
205 		Mutex::ScopedLock lock(_mutex);
206 		if (!isReadable()) return 0;
207 		std::size_t usedBefore = _used;
208 		std::size_t readLen = peek(buffer, length);
209 		poco_assert (_used >= readLen);
210 		_used -= readLen;
211 		if (0 == _used) _begin = 0;
212 		else _begin += length;
213 
214 		if (_notify) notify(usedBefore);
215 
216 		return readLen;
217 	}
218 
write(const T * pBuffer,std::size_t length)219 	std::size_t write(const T* pBuffer, std::size_t length)
220 		/// Writes data from supplied buffer to the FIFO buffer.
221 		/// If there is no sufficient space for the whole
222 		/// buffer to be written, data up to available
223 		/// length is written.
224 		/// The length of data to be written is determined from the
225 		/// length argument. Function does nothing and returns zero
226 		/// if length argument is equal to zero.
227 		///
228 		/// Returns the length of data written.
229 	{
230 		if (0 == length) return 0;
231 
232 		Mutex::ScopedLock lock(_mutex);
233 
234 		if (!isWritable()) return 0;
235 
236 		if (_buffer.size() - (_begin + _used) < length)
237 		{
238 			std::memmove(_buffer.begin(), begin(), _used * sizeof(T));
239 			_begin = 0;
240 		}
241 
242 		std::size_t usedBefore = _used;
243 		std::size_t available =  _buffer.size() - _used - _begin;
244 		std::size_t len = length > available ? available : length;
245 		std::memcpy(begin() + _used, pBuffer, len * sizeof(T));
246 		_used += len;
247 		poco_assert (_used <= _buffer.size());
248 		if (_notify) notify(usedBefore);
249 
250 		return len;
251 	}
252 
253 	std::size_t write(const Buffer<T>& buffer, std::size_t length = 0)
254 		/// Writes data from supplied buffer to the FIFO buffer.
255 		/// If there is no sufficient space for the whole
256 		/// buffer to be written, data up to available
257 		/// length is written.
258 		/// The length of data to be written is determined from the
259 		/// length argument or buffer size (when length argument is
260 		/// default zero or greater than buffer size).
261 		///
262 		/// Returns the length of data written.
263 	{
264 		if (length == 0 || length > buffer.size())
265 			length = buffer.size();
266 
267 		return write(buffer.begin(), length);
268 	}
269 
size()270 	std::size_t size() const
271 		/// Returns the size of the buffer.
272 	{
273 		return _buffer.size();
274 	}
275 
used()276 	std::size_t used() const
277 		/// Returns the size of the used portion of the buffer.
278 	{
279 		return _used;
280 	}
281 
available()282 	std::size_t available() const
283 		/// Returns the size of the available portion of the buffer.
284 	{
285 		return size() - _used;
286 	}
287 
288 	void drain(std::size_t length = 0)
289 		/// Drains length number of elements from the buffer.
290 		/// If length is zero or greater than buffer current
291 		/// content length, buffer is emptied.
292 	{
293 		Mutex::ScopedLock lock(_mutex);
294 
295 		std::size_t usedBefore = _used;
296 
297 		if (0 == length || length >= _used)
298 		{
299 			_begin = 0;
300 			_used = 0;
301 		}
302 		else
303 		{
304 			_begin += length;
305 			_used -= length;
306 		}
307 
308 		if (_notify) notify(usedBefore);
309 	}
310 
copy(const T * ptr,std::size_t length)311 	void copy(const T* ptr, std::size_t length)
312 		/// Copies the supplied data to the buffer and adjusts
313 		/// the used buffer size.
314 	{
315 		poco_check_ptr(ptr);
316 		if (0 == length) return;
317 
318 		Mutex::ScopedLock lock(_mutex);
319 
320 		if (length > available())
321 			throw Poco::InvalidAccessException("Cannot extend buffer.");
322 
323 		if (!isWritable())
324 			throw Poco::InvalidAccessException("Buffer not writable.");
325 
326 		std::memcpy(begin() + _used, ptr, length * sizeof(T));
327 		std::size_t usedBefore = _used;
328 		_used += length;
329 		if (_notify) notify(usedBefore);
330 	}
331 
advance(std::size_t length)332 	void advance(std::size_t length)
333 		/// Advances buffer by length elements.
334 		/// Should be called AFTER the data
335 		/// was copied into the buffer.
336 	{
337 		Mutex::ScopedLock lock(_mutex);
338 
339 		if (length > available())
340 			throw Poco::InvalidAccessException("Cannot extend buffer.");
341 
342 		if (!isWritable())
343 			throw Poco::InvalidAccessException("Buffer not writable.");
344 
345 		if (_buffer.size() - (_begin + _used) < length)
346 		{
347 			std::memmove(_buffer.begin(), begin(), _used * sizeof(T));
348 			_begin = 0;
349 		}
350 
351 		std::size_t usedBefore = _used;
352 		_used += length;
353 		if (_notify) notify(usedBefore);
354 	}
355 
begin()356 	T* begin()
357 		/// Returns the pointer to the beginning of the buffer.
358 	{
359 		Mutex::ScopedLock lock(_mutex);
360 		if (_begin != 0)
361 		{
362 			// Move the data to the start of the buffer so begin() and next()
363 			// always return consistent pointers with each other and allow writing
364 			// to the end of the buffer.
365 			std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T));
366 			_begin = 0;
367 		}
368 		return _buffer.begin();
369 	}
370 
next()371 	T* next()
372 		/// Returns the pointer to the next available position in the buffer.
373 	{
374 		Mutex::ScopedLock lock(_mutex);
375 		return begin() + _used;
376 	}
377 
378 	T& operator [] (std::size_t index)
379 		/// Returns value at index position.
380 		/// Throws InvalidAccessException if index is larger than
381 		/// the last valid (used) buffer position.
382 	{
383 		Mutex::ScopedLock lock(_mutex);
384 		if (index >= _used)
385 			throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
386 
387 		return _buffer[_begin + index];
388 	}
389 
390 	const T& operator [] (std::size_t index) const
391 		/// Returns value at index position.
392 		/// Throws InvalidAccessException if index is larger than
393 		/// the last valid (used) buffer position.
394 	{
395 		Mutex::ScopedLock lock(_mutex);
396 		if (index >= _used)
397 			throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
398 
399 		return _buffer[_begin + index];
400 	}
401 
buffer()402 	const Buffer<T>& buffer() const
403 		/// Returns const reference to the underlying buffer.
404 	{
405 		return _buffer;
406 	}
407 
408 	void setError(bool error = true)
409 		/// Sets the error flag on the buffer and empties it.
410 		/// If notifications are enabled, they will be triggered
411 		/// if appropriate.
412 		///
413 		/// Setting error flag to true prevents reading and writing
414 		/// to the buffer; to re-enable FIFOBuffer for reading/writing,
415 		/// the error flag must be set to false.
416 	{
417 		if (error)
418 		{
419 			bool f = false;
420 			Mutex::ScopedLock lock(_mutex);
421 			if (error && isReadable() && _notify) readable.notify(this, f);
422 			if (error && isWritable() && _notify) writable.notify(this, f);
423 			_error = error;
424 			_used = 0;
425 		}
426 		else
427 		{
428 			bool t = true;
429 			Mutex::ScopedLock lock(_mutex);
430 			_error = false;
431 			if (_notify && !_eof) writable.notify(this, t);
432 		}
433 	}
434 
isValid()435 	bool isValid() const
436 		/// Returns true if error flag is not set on the buffer,
437 		/// otherwise returns false.
438 	{
439 		return !_error;
440 	}
441 
442 	void setEOF(bool eof = true)
443 		/// Sets end-of-file flag on the buffer.
444 		///
445 		/// Setting EOF flag to true prevents writing to the
446 		/// buffer; reading from the buffer will still be
447 		/// allowed until all data present in the buffer at the
448 		/// EOF set time is drained. After that, to re-enable
449 		/// FIFOBuffer for reading/writing, EOF must be
450 		/// set to false.
451 		///
452 		/// Setting EOF flag to false clears EOF state if it
453 		/// was previously set. If EOF was not set, it has no
454 		/// effect.
455 	{
456 		Mutex::ScopedLock lock(_mutex);
457 		bool flag = !eof;
458 		if (_notify) writable.notify(this, flag);
459 		_eof = eof;
460 	}
461 
hasEOF()462 	bool hasEOF() const
463 		/// Returns true if EOF flag has been set.
464 	{
465 		return _eof;
466 	}
467 
isEOF()468 	bool isEOF() const
469 		/// Returns true if EOF flag has been set and buffer is empty.
470 	{
471 		return isEmpty() && _eof;
472 	}
473 
isEmpty()474 	bool isEmpty() const
475 		/// Returns true is buffer is empty, false otherwise.
476 	{
477 		return 0 == _used;
478 	}
479 
isFull()480 	bool isFull() const
481 		/// Returns true is buffer is full, false otherwise.
482 	{
483 		return size() == _used;
484 	}
485 
isReadable()486 	bool isReadable() const
487 		/// Returns true if buffer contains data and is not
488 		/// in error state.
489 	{
490 		return !isEmpty() && isValid();
491 	}
492 
isWritable()493 	bool isWritable() const
494 		/// Returns true if buffer is not full and is not
495 		/// in error state.
496 	{
497 		return !isFull() && isValid() && !_eof;
498 	}
499 
500 	void setNotify(bool notify = true)
501 		/// Enables/disables notifications.
502 	{
503 		_notify = notify;
504 	}
505 
getNotify()506 	bool getNotify() const
507 		/// Returns true if notifications are enabled, false otherwise.
508 	{
509 		return _notify;
510 	}
511 
mutex()512 	Mutex& mutex()
513 		/// Returns reference to mutex.
514 	{
515 		return _mutex;
516 	}
517 
518 private:
notify(std::size_t usedBefore)519 	void notify(std::size_t usedBefore)
520 	{
521 		bool t = true, f = false;
522 		if (usedBefore == 0 && _used > 0)
523 			readable.notify(this, t);
524 		else if (usedBefore > 0 && 0 == _used)
525 			readable.notify(this, f);
526 
527 		if (usedBefore == _buffer.size() && _used < _buffer.size())
528 			writable.notify(this, t);
529 		else if (usedBefore < _buffer.size() && _used == _buffer.size())
530 			writable.notify(this, f);
531 	}
532 
533 	BasicFIFOBuffer();
534 	BasicFIFOBuffer(const BasicFIFOBuffer&);
535 	BasicFIFOBuffer& operator = (const BasicFIFOBuffer&);
536 
537 	Buffer<T>     _buffer;
538 	std::size_t   _begin;
539 	std::size_t   _used;
540 	bool          _notify;
541 	mutable Mutex _mutex;
542 	bool          _eof;
543 	bool          _error;
544 };
545 
546 
547 //
548 // We provide an instantiation for char
549 //
550 typedef BasicFIFOBuffer<char> FIFOBuffer;
551 
552 
553 } // namespace Poco
554 
555 
556 #endif // Foundation_FIFOBuffer_INCLUDED
557