1 /* 2 * Copyright 2003-2021 The Music Player Daemon Project 3 * http://www.musicpd.org 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 18 */ 19 20 #ifndef MPD_ASYNC_INPUT_STREAM_HXX 21 #define MPD_ASYNC_INPUT_STREAM_HXX 22 23 #include "InputStream.hxx" 24 #include "event/InjectEvent.hxx" 25 #include "util/HugeAllocator.hxx" 26 #include "util/CircularBuffer.hxx" 27 28 #include <exception> 29 30 /** 31 * Helper class for moving asynchronous (non-blocking) InputStream 32 * implementations to the I/O thread. Data is being read into a ring 33 * buffer, and that buffer is then consumed by another thread using 34 * the regular #InputStream API. 35 */ 36 class AsyncInputStream : public InputStream { 37 enum class SeekState : uint8_t { 38 NONE, SCHEDULED, PENDING 39 }; 40 41 InjectEvent deferred_resume; 42 InjectEvent deferred_seek; 43 44 HugeArray<uint8_t> allocation; 45 46 CircularBuffer<uint8_t> buffer; 47 const size_t resume_at; 48 49 bool open = true; 50 51 /** 52 * Is the connection currently paused? That happens when the 53 * buffer was getting too large. It will be unpaused when the 54 * buffer is below the threshold again. 55 */ 56 bool paused = false; 57 58 SeekState seek_state = SeekState::NONE; 59 60 /** 61 * The #Tag object ready to be requested via 62 * InputStream::ReadTag(). 63 */ 64 std::unique_ptr<Tag> tag; 65 66 offset_type seek_offset; 67 68 protected: 69 std::exception_ptr postponed_exception; 70 71 public: 72 AsyncInputStream(EventLoop &event_loop, const char *_url, 73 Mutex &_mutex, 74 size_t _buffer_size, 75 size_t _resume_at) noexcept; 76 77 ~AsyncInputStream() noexcept override; 78 GetEventLoop() const79 auto &GetEventLoop() const noexcept { 80 return deferred_resume.GetEventLoop(); 81 } 82 83 /* virtual methods from InputStream */ 84 void Check() final; 85 bool IsEOF() const noexcept final; 86 void Seek(std::unique_lock<Mutex> &lock, 87 offset_type new_offset) final; 88 std::unique_ptr<Tag> ReadTag() noexcept final; 89 bool IsAvailable() const noexcept final; 90 size_t Read(std::unique_lock<Mutex> &lock, 91 void *ptr, size_t read_size) final; 92 93 protected: 94 /** 95 * Pass an tag from the I/O thread to the client thread. 96 */ 97 void SetTag(std::unique_ptr<Tag> _tag) noexcept; 98 void ClearTag() noexcept; 99 100 void Pause() noexcept; 101 IsPaused() const102 bool IsPaused() const noexcept { 103 return paused; 104 } 105 106 /** 107 * Declare that the underlying stream was closed. We will 108 * continue feeding Read() calls from the buffer until it runs 109 * empty. 110 */ SetClosed()111 void SetClosed() noexcept { 112 open = false; 113 } 114 IsBufferEmpty() const115 bool IsBufferEmpty() const noexcept { 116 return buffer.empty(); 117 } 118 IsBufferFull() const119 bool IsBufferFull() const noexcept { 120 return buffer.IsFull(); 121 } 122 123 /** 124 * Determine how many bytes can be added to the buffer. 125 */ 126 [[gnu::pure]] GetBufferSpace() const127 size_t GetBufferSpace() const noexcept { 128 return buffer.GetSpace(); 129 } 130 PrepareWriteBuffer()131 CircularBuffer<uint8_t>::Range PrepareWriteBuffer() noexcept { 132 return buffer.Write(); 133 } 134 135 void CommitWriteBuffer(size_t nbytes) noexcept; 136 137 /** 138 * Append data to the buffer. The size must fit into the 139 * buffer; see GetBufferSpace(). 140 */ 141 void AppendToBuffer(const void *data, size_t append_size) noexcept; 142 143 /** 144 * Implement code here that will resume the stream after it 145 * has been paused due to full input buffer. 146 */ 147 virtual void DoResume() = 0; 148 149 /** 150 * The actual Seek() implementation. This virtual method will 151 * be called from within the I/O thread. When the operation 152 * is finished, call SeekDone() to notify the caller. 153 */ 154 virtual void DoSeek(offset_type new_offset) = 0; 155 IsSeekPending() const156 bool IsSeekPending() const noexcept { 157 return seek_state == SeekState::PENDING; 158 } 159 160 /** 161 * Call this after seeking has finished. It will notify the 162 * client thread. 163 */ 164 void SeekDone() noexcept; 165 166 private: 167 void Resume(); 168 169 /* for InjectEvent */ 170 void DeferredResume() noexcept; 171 void DeferredSeek() noexcept; 172 }; 173 174 #endif 175