1 // 2 // FIFOBuffer.h 3 // 4 // Library: Foundation 5 // Package: Core 6 // Module: FIFOBuffer 7 // 8 // Definition of the FIFOBuffer class. 9 // 10 // Copyright (c) 2006, Applied Informatics Software Engineering GmbH. 11 // and Contributors. 12 // 13 // SPDX-License-Identifier: BSL-1.0 14 // 15 16 17 #ifndef Foundation_FIFOBuffer_INCLUDED 18 #define Foundation_FIFOBuffer_INCLUDED 19 20 21 #include "Poco/Foundation.h" 22 #include "Poco/Exception.h" 23 #include "Poco/Buffer.h" 24 #include "Poco/BasicEvent.h" 25 #include "Poco/Mutex.h" 26 #include "Poco/Format.h" 27 28 29 namespace Poco { 30 31 32 template <class T> 33 class BasicFIFOBuffer 34 /// A simple buffer class with support for re-entrant, 35 /// FIFO-style read/write operations, as well as (optional) 36 /// empty/non-empty/full (i.e. writable/readable) transition 37 /// notifications. Buffer can be flagged with end-of-file and 38 /// error flags, which renders it un-readable/writable. 39 /// 40 /// Critical portions of code are protected by a recursive mutex. 41 /// However, to achieve thread-safety in cases where multiple 42 /// member function calls are involved and have to be atomic, 43 /// the mutex must be locked externally. 44 /// 45 /// Buffer size, as well as amount of unread data and 46 /// available space introspections are supported as well. 47 /// 48 /// This class is useful anywhere where a FIFO functionality 49 /// is needed. 50 { 51 public: 52 typedef T Type; 53 54 mutable Poco::BasicEvent<bool> writable; 55 /// Event indicating "writability" of the buffer, 56 /// triggered as follows: 57 /// 58 /// * when buffer transitions from non-full to full, 59 /// Writable event observers are notified, with 60 /// false value as the argument 61 /// 62 /// * when buffer transitions from full to non-full, 63 /// Writable event observers are notified, with 64 /// true value as the argument 65 66 mutable Poco::BasicEvent<bool> readable; 67 /// Event indicating "readability" of the buffer, 68 /// triggered as follows: 69 /// 70 /// * when buffer transitions from non-empty to empty, 71 /// Readable event observers are notified, with false 72 /// value as the argument 73 /// 74 /// * when FIFOBuffer transitions from empty to non-empty, 75 /// Readable event observers are notified, with true value 76 /// as the argument 77 78 BasicFIFOBuffer(std::size_t size, bool notify = false): _buffer(size)79 _buffer(size), 80 _begin(0), 81 _used(0), 82 _notify(notify), 83 _eof(false), 84 _error(false) 85 /// Creates the FIFOBuffer. 86 { 87 } 88 89 BasicFIFOBuffer(T* pBuffer, std::size_t size, bool notify = false): _buffer(pBuffer,size)90 _buffer(pBuffer, size), 91 _begin(0), 92 _used(0), 93 _notify(notify), 94 _eof(false), 95 _error(false) 96 /// Creates the FIFOBuffer. 97 { 98 } 99 100 BasicFIFOBuffer(const T* pBuffer, std::size_t size, bool notify = false): _buffer(pBuffer,size)101 _buffer(pBuffer, size), 102 _begin(0), 103 _used(size), 104 _notify(notify), 105 _eof(false), 106 _error(false) 107 /// Creates the FIFOBuffer. 108 { 109 } 110 ~BasicFIFOBuffer()111 ~BasicFIFOBuffer() 112 /// Destroys the FIFOBuffer. 113 { 114 } 115 116 void resize(std::size_t newSize, bool preserveContent = true) 117 /// Resizes the buffer. If preserveContent is true, 118 /// the content of the old buffer is preserved. 119 /// New size can be larger or smaller than 120 /// the current size, but it must not be 0. 121 /// Additionally, if the new length is smaller 122 /// than currently used length and preserveContent 123 /// is true, InvalidAccessException is thrown. 124 { 125 Mutex::ScopedLock lock(_mutex); 126 127 if (preserveContent && (newSize < _used)) 128 throw InvalidAccessException("Can not resize FIFO without data loss."); 129 130 std::size_t usedBefore = _used; 131 _buffer.resize(newSize, preserveContent); 132 if (!preserveContent) _used = 0; 133 if (_notify) notify(usedBefore); 134 } 135 peek(T * pBuffer,std::size_t length)136 std::size_t peek(T* pBuffer, std::size_t length) const 137 /// Peeks into the data currently in the FIFO 138 /// without actually extracting it. 139 /// If length is zero, the return is immediate. 140 /// If length is greater than used length, 141 /// it is substituted with the the current FIFO 142 /// used length. 143 /// 144 /// Returns the number of elements copied in the 145 /// supplied buffer. 146 { 147 if (0 == length) return 0; 148 Mutex::ScopedLock lock(_mutex); 149 if (!isReadable()) return 0; 150 if (length > _used) length = _used; 151 std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T)); 152 return length; 153 } 154 155 std::size_t peek(Poco::Buffer<T>& buffer, std::size_t length = 0) const 156 /// Peeks into the data currently in the FIFO 157 /// without actually extracting it. 158 /// Resizes the supplied buffer to the size of 159 /// data written to it. If length is not 160 /// supplied by the caller or is greater than length 161 /// of currently used data, the current FIFO used 162 /// data length is substituted for it. 163 /// 164 /// Returns the number of elements copied in the 165 /// supplied buffer. 166 { 167 Mutex::ScopedLock lock(_mutex); 168 if (!isReadable()) return 0; 169 if (0 == length || length > _used) length = _used; 170 buffer.resize(length); 171 return peek(buffer.begin(), length); 172 } 173 read(T * pBuffer,std::size_t length)174 std::size_t read(T* pBuffer, std::size_t length) 175 /// Copies the data currently in the FIFO 176 /// into the supplied buffer, which must be 177 /// preallocated to at least the length size 178 /// before calling this function. 179 /// 180 /// Returns the size of the copied data. 181 { 182 if (0 == length) return 0; 183 Mutex::ScopedLock lock(_mutex); 184 if (!isReadable()) return 0; 185 std::size_t usedBefore = _used; 186 std::size_t readLen = peek(pBuffer, length); 187 poco_assert (_used >= readLen); 188 _used -= readLen; 189 if (0 == _used) _begin = 0; 190 else _begin += length; 191 192 if (_notify) notify(usedBefore); 193 194 return readLen; 195 } 196 197 std::size_t read(Poco::Buffer<T>& buffer, std::size_t length = 0) 198 /// Copies the data currently in the FIFO 199 /// into the supplied buffer. 200 /// Resizes the supplied buffer to the size of 201 /// data written to it. 202 /// 203 /// Returns the size of the copied data. 204 { 205 Mutex::ScopedLock lock(_mutex); 206 if (!isReadable()) return 0; 207 std::size_t usedBefore = _used; 208 std::size_t readLen = peek(buffer, length); 209 poco_assert (_used >= readLen); 210 _used -= readLen; 211 if (0 == _used) _begin = 0; 212 else _begin += length; 213 214 if (_notify) notify(usedBefore); 215 216 return readLen; 217 } 218 write(const T * pBuffer,std::size_t length)219 std::size_t write(const T* pBuffer, std::size_t length) 220 /// Writes data from supplied buffer to the FIFO buffer. 221 /// If there is no sufficient space for the whole 222 /// buffer to be written, data up to available 223 /// length is written. 224 /// The length of data to be written is determined from the 225 /// length argument. Function does nothing and returns zero 226 /// if length argument is equal to zero. 227 /// 228 /// Returns the length of data written. 229 { 230 if (0 == length) return 0; 231 232 Mutex::ScopedLock lock(_mutex); 233 234 if (!isWritable()) return 0; 235 236 if (_buffer.size() - (_begin + _used) < length) 237 { 238 std::memmove(_buffer.begin(), begin(), _used * sizeof(T)); 239 _begin = 0; 240 } 241 242 std::size_t usedBefore = _used; 243 std::size_t available = _buffer.size() - _used - _begin; 244 std::size_t len = length > available ? available : length; 245 std::memcpy(begin() + _used, pBuffer, len * sizeof(T)); 246 _used += len; 247 poco_assert (_used <= _buffer.size()); 248 if (_notify) notify(usedBefore); 249 250 return len; 251 } 252 253 std::size_t write(const Buffer<T>& buffer, std::size_t length = 0) 254 /// Writes data from supplied buffer to the FIFO buffer. 255 /// If there is no sufficient space for the whole 256 /// buffer to be written, data up to available 257 /// length is written. 258 /// The length of data to be written is determined from the 259 /// length argument or buffer size (when length argument is 260 /// default zero or greater than buffer size). 261 /// 262 /// Returns the length of data written. 263 { 264 if (length == 0 || length > buffer.size()) 265 length = buffer.size(); 266 267 return write(buffer.begin(), length); 268 } 269 size()270 std::size_t size() const 271 /// Returns the size of the buffer. 272 { 273 return _buffer.size(); 274 } 275 used()276 std::size_t used() const 277 /// Returns the size of the used portion of the buffer. 278 { 279 return _used; 280 } 281 available()282 std::size_t available() const 283 /// Returns the size of the available portion of the buffer. 284 { 285 return size() - _used; 286 } 287 288 void drain(std::size_t length = 0) 289 /// Drains length number of elements from the buffer. 290 /// If length is zero or greater than buffer current 291 /// content length, buffer is emptied. 292 { 293 Mutex::ScopedLock lock(_mutex); 294 295 std::size_t usedBefore = _used; 296 297 if (0 == length || length >= _used) 298 { 299 _begin = 0; 300 _used = 0; 301 } 302 else 303 { 304 _begin += length; 305 _used -= length; 306 } 307 308 if (_notify) notify(usedBefore); 309 } 310 copy(const T * ptr,std::size_t length)311 void copy(const T* ptr, std::size_t length) 312 /// Copies the supplied data to the buffer and adjusts 313 /// the used buffer size. 314 { 315 poco_check_ptr(ptr); 316 if (0 == length) return; 317 318 Mutex::ScopedLock lock(_mutex); 319 320 if (length > available()) 321 throw Poco::InvalidAccessException("Cannot extend buffer."); 322 323 if (!isWritable()) 324 throw Poco::InvalidAccessException("Buffer not writable."); 325 326 std::memcpy(begin() + _used, ptr, length * sizeof(T)); 327 std::size_t usedBefore = _used; 328 _used += length; 329 if (_notify) notify(usedBefore); 330 } 331 advance(std::size_t length)332 void advance(std::size_t length) 333 /// Advances buffer by length elements. 334 /// Should be called AFTER the data 335 /// was copied into the buffer. 336 { 337 Mutex::ScopedLock lock(_mutex); 338 339 if (length > available()) 340 throw Poco::InvalidAccessException("Cannot extend buffer."); 341 342 if (!isWritable()) 343 throw Poco::InvalidAccessException("Buffer not writable."); 344 345 if (_buffer.size() - (_begin + _used) < length) 346 { 347 std::memmove(_buffer.begin(), begin(), _used * sizeof(T)); 348 _begin = 0; 349 } 350 351 std::size_t usedBefore = _used; 352 _used += length; 353 if (_notify) notify(usedBefore); 354 } 355 begin()356 T* begin() 357 /// Returns the pointer to the beginning of the buffer. 358 { 359 Mutex::ScopedLock lock(_mutex); 360 if (_begin != 0) 361 { 362 // Move the data to the start of the buffer so begin() and next() 363 // always return consistent pointers with each other and allow writing 364 // to the end of the buffer. 365 std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T)); 366 _begin = 0; 367 } 368 return _buffer.begin(); 369 } 370 next()371 T* next() 372 /// Returns the pointer to the next available position in the buffer. 373 { 374 Mutex::ScopedLock lock(_mutex); 375 return begin() + _used; 376 } 377 378 T& operator [] (std::size_t index) 379 /// Returns value at index position. 380 /// Throws InvalidAccessException if index is larger than 381 /// the last valid (used) buffer position. 382 { 383 Mutex::ScopedLock lock(_mutex); 384 if (index >= _used) 385 throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1)); 386 387 return _buffer[_begin + index]; 388 } 389 390 const T& operator [] (std::size_t index) const 391 /// Returns value at index position. 392 /// Throws InvalidAccessException if index is larger than 393 /// the last valid (used) buffer position. 394 { 395 Mutex::ScopedLock lock(_mutex); 396 if (index >= _used) 397 throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1)); 398 399 return _buffer[_begin + index]; 400 } 401 buffer()402 const Buffer<T>& buffer() const 403 /// Returns const reference to the underlying buffer. 404 { 405 return _buffer; 406 } 407 408 void setError(bool error = true) 409 /// Sets the error flag on the buffer and empties it. 410 /// If notifications are enabled, they will be triggered 411 /// if appropriate. 412 /// 413 /// Setting error flag to true prevents reading and writing 414 /// to the buffer; to re-enable FIFOBuffer for reading/writing, 415 /// the error flag must be set to false. 416 { 417 if (error) 418 { 419 bool f = false; 420 Mutex::ScopedLock lock(_mutex); 421 if (error && isReadable() && _notify) readable.notify(this, f); 422 if (error && isWritable() && _notify) writable.notify(this, f); 423 _error = error; 424 _used = 0; 425 } 426 else 427 { 428 bool t = true; 429 Mutex::ScopedLock lock(_mutex); 430 _error = false; 431 if (_notify && !_eof) writable.notify(this, t); 432 } 433 } 434 isValid()435 bool isValid() const 436 /// Returns true if error flag is not set on the buffer, 437 /// otherwise returns false. 438 { 439 return !_error; 440 } 441 442 void setEOF(bool eof = true) 443 /// Sets end-of-file flag on the buffer. 444 /// 445 /// Setting EOF flag to true prevents writing to the 446 /// buffer; reading from the buffer will still be 447 /// allowed until all data present in the buffer at the 448 /// EOF set time is drained. After that, to re-enable 449 /// FIFOBuffer for reading/writing, EOF must be 450 /// set to false. 451 /// 452 /// Setting EOF flag to false clears EOF state if it 453 /// was previously set. If EOF was not set, it has no 454 /// effect. 455 { 456 Mutex::ScopedLock lock(_mutex); 457 bool flag = !eof; 458 if (_notify) writable.notify(this, flag); 459 _eof = eof; 460 } 461 hasEOF()462 bool hasEOF() const 463 /// Returns true if EOF flag has been set. 464 { 465 return _eof; 466 } 467 isEOF()468 bool isEOF() const 469 /// Returns true if EOF flag has been set and buffer is empty. 470 { 471 return isEmpty() && _eof; 472 } 473 isEmpty()474 bool isEmpty() const 475 /// Returns true is buffer is empty, false otherwise. 476 { 477 return 0 == _used; 478 } 479 isFull()480 bool isFull() const 481 /// Returns true is buffer is full, false otherwise. 482 { 483 return size() == _used; 484 } 485 isReadable()486 bool isReadable() const 487 /// Returns true if buffer contains data and is not 488 /// in error state. 489 { 490 return !isEmpty() && isValid(); 491 } 492 isWritable()493 bool isWritable() const 494 /// Returns true if buffer is not full and is not 495 /// in error state. 496 { 497 return !isFull() && isValid() && !_eof; 498 } 499 500 void setNotify(bool notify = true) 501 /// Enables/disables notifications. 502 { 503 _notify = notify; 504 } 505 getNotify()506 bool getNotify() const 507 /// Returns true if notifications are enabled, false otherwise. 508 { 509 return _notify; 510 } 511 mutex()512 Mutex& mutex() 513 /// Returns reference to mutex. 514 { 515 return _mutex; 516 } 517 518 private: notify(std::size_t usedBefore)519 void notify(std::size_t usedBefore) 520 { 521 bool t = true, f = false; 522 if (usedBefore == 0 && _used > 0) 523 readable.notify(this, t); 524 else if (usedBefore > 0 && 0 == _used) 525 readable.notify(this, f); 526 527 if (usedBefore == _buffer.size() && _used < _buffer.size()) 528 writable.notify(this, t); 529 else if (usedBefore < _buffer.size() && _used == _buffer.size()) 530 writable.notify(this, f); 531 } 532 533 BasicFIFOBuffer(); 534 BasicFIFOBuffer(const BasicFIFOBuffer&); 535 BasicFIFOBuffer& operator = (const BasicFIFOBuffer&); 536 537 Buffer<T> _buffer; 538 std::size_t _begin; 539 std::size_t _used; 540 bool _notify; 541 mutable Mutex _mutex; 542 bool _eof; 543 bool _error; 544 }; 545 546 547 // 548 // We provide an instantiation for char 549 // 550 typedef BasicFIFOBuffer<char> FIFOBuffer; 551 552 553 } // namespace Poco 554 555 556 #endif // Foundation_FIFOBuffer_INCLUDED 557