1 /*
2  * Copyright 2003-2021 The Music Player Daemon Project
3  * http://www.musicpd.org
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #ifndef MPD_ASYNC_INPUT_STREAM_HXX
21 #define MPD_ASYNC_INPUT_STREAM_HXX
22 
23 #include "InputStream.hxx"
24 #include "event/InjectEvent.hxx"
25 #include "util/HugeAllocator.hxx"
26 #include "util/CircularBuffer.hxx"
27 
28 #include <exception>
29 
30 /**
31  * Helper class for moving asynchronous (non-blocking) InputStream
32  * implementations to the I/O thread.  Data is being read into a ring
33  * buffer, and that buffer is then consumed by another thread using
34  * the regular #InputStream API.
35  */
36 class AsyncInputStream : public InputStream {
37 	enum class SeekState : uint8_t {
38 		NONE, SCHEDULED, PENDING
39 	};
40 
41 	InjectEvent deferred_resume;
42 	InjectEvent deferred_seek;
43 
44 	HugeArray<uint8_t> allocation;
45 
46 	CircularBuffer<uint8_t> buffer;
47 	const size_t resume_at;
48 
49 	bool open = true;
50 
51 	/**
52 	 * Is the connection currently paused?  That happens when the
53 	 * buffer was getting too large.  It will be unpaused when the
54 	 * buffer is below the threshold again.
55 	 */
56 	bool paused = false;
57 
58 	SeekState seek_state = SeekState::NONE;
59 
60 	/**
61 	 * The #Tag object ready to be requested via
62 	 * InputStream::ReadTag().
63 	 */
64 	std::unique_ptr<Tag> tag;
65 
66 	offset_type seek_offset;
67 
68 protected:
69 	std::exception_ptr postponed_exception;
70 
71 public:
72 	AsyncInputStream(EventLoop &event_loop, const char *_url,
73 			 Mutex &_mutex,
74 			 size_t _buffer_size,
75 			 size_t _resume_at) noexcept;
76 
77 	~AsyncInputStream() noexcept override;
78 
GetEventLoop() const79 	auto &GetEventLoop() const noexcept {
80 		return deferred_resume.GetEventLoop();
81 	}
82 
83 	/* virtual methods from InputStream */
84 	void Check() final;
85 	bool IsEOF() const noexcept final;
86 	void Seek(std::unique_lock<Mutex> &lock,
87 		  offset_type new_offset) final;
88 	std::unique_ptr<Tag> ReadTag() noexcept final;
89 	bool IsAvailable() const noexcept final;
90 	size_t Read(std::unique_lock<Mutex> &lock,
91 		    void *ptr, size_t read_size) final;
92 
93 protected:
94 	/**
95 	 * Pass an tag from the I/O thread to the client thread.
96 	 */
97 	void SetTag(std::unique_ptr<Tag> _tag) noexcept;
98 	void ClearTag() noexcept;
99 
100 	void Pause() noexcept;
101 
IsPaused() const102 	bool IsPaused() const noexcept {
103 		return paused;
104 	}
105 
106 	/**
107 	 * Declare that the underlying stream was closed.  We will
108 	 * continue feeding Read() calls from the buffer until it runs
109 	 * empty.
110 	 */
SetClosed()111 	void SetClosed() noexcept {
112 		open = false;
113 	}
114 
IsBufferEmpty() const115 	bool IsBufferEmpty() const noexcept {
116 		return buffer.empty();
117 	}
118 
IsBufferFull() const119 	bool IsBufferFull() const noexcept {
120 		return buffer.IsFull();
121 	}
122 
123 	/**
124 	 * Determine how many bytes can be added to the buffer.
125 	 */
126 	[[gnu::pure]]
GetBufferSpace() const127 	size_t GetBufferSpace() const noexcept {
128 		return buffer.GetSpace();
129 	}
130 
PrepareWriteBuffer()131 	CircularBuffer<uint8_t>::Range PrepareWriteBuffer() noexcept {
132 		return buffer.Write();
133 	}
134 
135 	void CommitWriteBuffer(size_t nbytes) noexcept;
136 
137 	/**
138 	 * Append data to the buffer.  The size must fit into the
139 	 * buffer; see GetBufferSpace().
140 	 */
141 	void AppendToBuffer(const void *data, size_t append_size) noexcept;
142 
143 	/**
144 	 * Implement code here that will resume the stream after it
145 	 * has been paused due to full input buffer.
146 	 */
147 	virtual void DoResume() = 0;
148 
149 	/**
150 	 * The actual Seek() implementation.  This virtual method will
151 	 * be called from within the I/O thread.  When the operation
152 	 * is finished, call SeekDone() to notify the caller.
153 	 */
154 	virtual void DoSeek(offset_type new_offset) = 0;
155 
IsSeekPending() const156 	bool IsSeekPending() const noexcept {
157 		return seek_state == SeekState::PENDING;
158 	}
159 
160 	/**
161 	 * Call this after seeking has finished.  It will notify the
162 	 * client thread.
163 	 */
164 	void SeekDone() noexcept;
165 
166 private:
167 	void Resume();
168 
169 	/* for InjectEvent */
170 	void DeferredResume() noexcept;
171 	void DeferredSeek() noexcept;
172 };
173 
174 #endif
175