1 /*
2  * Copyright 2003-2021 The Music Player Daemon Project
3  * http://www.musicpd.org
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #include "AsyncInputStream.hxx"
21 #include "CondHandler.hxx"
22 #include "tag/Tag.hxx"
23 #include "thread/Cond.hxx"
24 #include "event/Loop.hxx"
25 
26 #include <cassert>
27 #include <stdexcept>
28 
29 #include <string.h>
30 
AsyncInputStream(EventLoop & event_loop,const char * _url,Mutex & _mutex,size_t _buffer_size,size_t _resume_at)31 AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
32 				   Mutex &_mutex,
33 				   size_t _buffer_size,
34 				   size_t _resume_at) noexcept
35 	:InputStream(_url, _mutex),
36 	 deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
37 	 deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
38 	 allocation(_buffer_size),
39 	 buffer(&allocation.front(), allocation.size()),
40 	 resume_at(_resume_at)
41 {
42 	allocation.ForkCow(false);
43 }
44 
~AsyncInputStream()45 AsyncInputStream::~AsyncInputStream() noexcept
46 {
47 	buffer.Clear();
48 }
49 
50 void
SetTag(std::unique_ptr<Tag> _tag)51 AsyncInputStream::SetTag(std::unique_ptr<Tag> _tag) noexcept
52 {
53 	tag = std::move(_tag);
54 }
55 
56 void
ClearTag()57 AsyncInputStream::ClearTag() noexcept
58 {
59 	tag.reset();
60 }
61 
62 void
Pause()63 AsyncInputStream::Pause() noexcept
64 {
65 	assert(GetEventLoop().IsInside());
66 
67 	paused = true;
68 }
69 
70 inline void
Resume()71 AsyncInputStream::Resume()
72 {
73 	assert(GetEventLoop().IsInside());
74 
75 	if (paused) {
76 		paused = false;
77 
78 		DoResume();
79 	}
80 }
81 
82 void
Check()83 AsyncInputStream::Check()
84 {
85 	if (postponed_exception)
86 		std::rethrow_exception(std::exchange(postponed_exception,
87 						     std::exception_ptr()));
88 }
89 
90 bool
IsEOF() const91 AsyncInputStream::IsEOF() const noexcept
92 {
93 	return (KnownSize() && offset >= size) ||
94 		(!open && buffer.empty());
95 }
96 
97 void
Seek(std::unique_lock<Mutex> & lock,offset_type new_offset)98 AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
99 		       offset_type new_offset)
100 {
101 	assert(IsReady());
102 	assert(seek_state == SeekState::NONE);
103 
104 	if (new_offset == offset)
105 		/* no-op */
106 		return;
107 
108 	if (!IsSeekable())
109 		throw std::runtime_error("Not seekable");
110 
111 	/* check if we can fast-forward the buffer */
112 
113 	while (new_offset > offset) {
114 		auto r = buffer.Read();
115 		if (r.empty())
116 			break;
117 
118 		const size_t nbytes =
119 			new_offset - offset < (offset_type)r.size
120 					       ? new_offset - offset
121 					       : r.size;
122 
123 		buffer.Consume(nbytes);
124 		offset += nbytes;
125 	}
126 
127 	if (new_offset == offset)
128 		return;
129 
130 	/* no: ask the implementation to seek */
131 
132 	seek_offset = new_offset;
133 	seek_state = SeekState::SCHEDULED;
134 
135 	deferred_seek.Schedule();
136 
137 	CondInputStreamHandler cond_handler;
138 	const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
139 	cond_handler.cond.wait(lock,
140 			       [this]{ return seek_state == SeekState::NONE; });
141 
142 	Check();
143 }
144 
145 void
SeekDone()146 AsyncInputStream::SeekDone() noexcept
147 {
148 	assert(GetEventLoop().IsInside());
149 	assert(IsSeekPending());
150 
151 	/* we may have reached end-of-file previously, and the
152 	   connection may have been closed already; however after
153 	   seeking successfully, the connection must be alive again */
154 	open = true;
155 
156 	seek_state = SeekState::NONE;
157 	InvokeOnAvailable();
158 }
159 
160 std::unique_ptr<Tag>
ReadTag()161 AsyncInputStream::ReadTag() noexcept
162 {
163 	return std::exchange(tag, nullptr);
164 }
165 
166 bool
IsAvailable() const167 AsyncInputStream::IsAvailable() const noexcept
168 {
169 	return postponed_exception ||
170 		IsEOF() ||
171 		!buffer.empty();
172 }
173 
174 size_t
Read(std::unique_lock<Mutex> & lock,void * ptr,size_t read_size)175 AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
176 		       void *ptr, size_t read_size)
177 {
178 	assert(!GetEventLoop().IsInside());
179 
180 	CondInputStreamHandler cond_handler;
181 
182 	/* wait for data */
183 	CircularBuffer<uint8_t>::Range r;
184 	while (true) {
185 		Check();
186 
187 		r = buffer.Read();
188 		if (!r.empty() || IsEOF())
189 			break;
190 
191 		const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
192 		cond_handler.cond.wait(lock);
193 	}
194 
195 	const size_t nbytes = std::min(read_size, r.size);
196 	memcpy(ptr, r.data, nbytes);
197 	buffer.Consume(nbytes);
198 
199 	offset += (offset_type)nbytes;
200 
201 	if (paused && buffer.GetSize() < resume_at)
202 		deferred_resume.Schedule();
203 
204 	return nbytes;
205 }
206 
207 void
CommitWriteBuffer(size_t nbytes)208 AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
209 {
210 	buffer.Append(nbytes);
211 
212 	if (!IsReady())
213 		SetReady();
214 	else
215 		InvokeOnAvailable();
216 }
217 
218 void
AppendToBuffer(const void * data,size_t append_size)219 AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
220 {
221 	auto w = buffer.Write();
222 	assert(!w.empty());
223 
224 	size_t nbytes = std::min(w.size, append_size);
225 	memcpy(w.data, data, nbytes);
226 	buffer.Append(nbytes);
227 
228 	const size_t remaining = append_size - nbytes;
229 	if (remaining > 0) {
230 		w = buffer.Write();
231 		assert(!w.empty());
232 		assert(w.size >= remaining);
233 
234 		memcpy(w.data, (const uint8_t *)data + nbytes, remaining);
235 		buffer.Append(remaining);
236 	}
237 
238 	if (!IsReady())
239 		SetReady();
240 	else
241 		InvokeOnAvailable();
242 }
243 
244 void
DeferredResume()245 AsyncInputStream::DeferredResume() noexcept
246 {
247 	const std::scoped_lock<Mutex> protect(mutex);
248 
249 	try {
250 		Resume();
251 	} catch (...) {
252 		postponed_exception = std::current_exception();
253 		InvokeOnAvailable();
254 	}
255 }
256 
257 void
DeferredSeek()258 AsyncInputStream::DeferredSeek() noexcept
259 {
260 	const std::scoped_lock<Mutex> protect(mutex);
261 	if (seek_state != SeekState::SCHEDULED)
262 		return;
263 
264 	try {
265 		Resume();
266 
267 		seek_state = SeekState::PENDING;
268 		buffer.Clear();
269 		paused = false;
270 
271 		DoSeek(seek_offset);
272 	} catch (...) {
273 		seek_state = SeekState::NONE;
274 		postponed_exception = std::current_exception();
275 		InvokeOnAvailable();
276 	}
277 }
278