1 /*
2 * Copyright 2003-2021 The Music Player Daemon Project
3 * http://www.musicpd.org
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "AsyncInputStream.hxx"
21 #include "CondHandler.hxx"
22 #include "tag/Tag.hxx"
23 #include "thread/Cond.hxx"
24 #include "event/Loop.hxx"
25
26 #include <cassert>
27 #include <stdexcept>
28
29 #include <string.h>
30
AsyncInputStream(EventLoop & event_loop,const char * _url,Mutex & _mutex,size_t _buffer_size,size_t _resume_at)31 AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
32 Mutex &_mutex,
33 size_t _buffer_size,
34 size_t _resume_at) noexcept
35 :InputStream(_url, _mutex),
36 deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
37 deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
38 allocation(_buffer_size),
39 buffer(&allocation.front(), allocation.size()),
40 resume_at(_resume_at)
41 {
42 allocation.ForkCow(false);
43 }
44
~AsyncInputStream()45 AsyncInputStream::~AsyncInputStream() noexcept
46 {
47 buffer.Clear();
48 }
49
50 void
SetTag(std::unique_ptr<Tag> _tag)51 AsyncInputStream::SetTag(std::unique_ptr<Tag> _tag) noexcept
52 {
53 tag = std::move(_tag);
54 }
55
56 void
ClearTag()57 AsyncInputStream::ClearTag() noexcept
58 {
59 tag.reset();
60 }
61
62 void
Pause()63 AsyncInputStream::Pause() noexcept
64 {
65 assert(GetEventLoop().IsInside());
66
67 paused = true;
68 }
69
70 inline void
Resume()71 AsyncInputStream::Resume()
72 {
73 assert(GetEventLoop().IsInside());
74
75 if (paused) {
76 paused = false;
77
78 DoResume();
79 }
80 }
81
82 void
Check()83 AsyncInputStream::Check()
84 {
85 if (postponed_exception)
86 std::rethrow_exception(std::exchange(postponed_exception,
87 std::exception_ptr()));
88 }
89
90 bool
IsEOF() const91 AsyncInputStream::IsEOF() const noexcept
92 {
93 return (KnownSize() && offset >= size) ||
94 (!open && buffer.empty());
95 }
96
97 void
Seek(std::unique_lock<Mutex> & lock,offset_type new_offset)98 AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
99 offset_type new_offset)
100 {
101 assert(IsReady());
102 assert(seek_state == SeekState::NONE);
103
104 if (new_offset == offset)
105 /* no-op */
106 return;
107
108 if (!IsSeekable())
109 throw std::runtime_error("Not seekable");
110
111 /* check if we can fast-forward the buffer */
112
113 while (new_offset > offset) {
114 auto r = buffer.Read();
115 if (r.empty())
116 break;
117
118 const size_t nbytes =
119 new_offset - offset < (offset_type)r.size
120 ? new_offset - offset
121 : r.size;
122
123 buffer.Consume(nbytes);
124 offset += nbytes;
125 }
126
127 if (new_offset == offset)
128 return;
129
130 /* no: ask the implementation to seek */
131
132 seek_offset = new_offset;
133 seek_state = SeekState::SCHEDULED;
134
135 deferred_seek.Schedule();
136
137 CondInputStreamHandler cond_handler;
138 const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
139 cond_handler.cond.wait(lock,
140 [this]{ return seek_state == SeekState::NONE; });
141
142 Check();
143 }
144
145 void
SeekDone()146 AsyncInputStream::SeekDone() noexcept
147 {
148 assert(GetEventLoop().IsInside());
149 assert(IsSeekPending());
150
151 /* we may have reached end-of-file previously, and the
152 connection may have been closed already; however after
153 seeking successfully, the connection must be alive again */
154 open = true;
155
156 seek_state = SeekState::NONE;
157 InvokeOnAvailable();
158 }
159
160 std::unique_ptr<Tag>
ReadTag()161 AsyncInputStream::ReadTag() noexcept
162 {
163 return std::exchange(tag, nullptr);
164 }
165
166 bool
IsAvailable() const167 AsyncInputStream::IsAvailable() const noexcept
168 {
169 return postponed_exception ||
170 IsEOF() ||
171 !buffer.empty();
172 }
173
174 size_t
Read(std::unique_lock<Mutex> & lock,void * ptr,size_t read_size)175 AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
176 void *ptr, size_t read_size)
177 {
178 assert(!GetEventLoop().IsInside());
179
180 CondInputStreamHandler cond_handler;
181
182 /* wait for data */
183 CircularBuffer<uint8_t>::Range r;
184 while (true) {
185 Check();
186
187 r = buffer.Read();
188 if (!r.empty() || IsEOF())
189 break;
190
191 const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
192 cond_handler.cond.wait(lock);
193 }
194
195 const size_t nbytes = std::min(read_size, r.size);
196 memcpy(ptr, r.data, nbytes);
197 buffer.Consume(nbytes);
198
199 offset += (offset_type)nbytes;
200
201 if (paused && buffer.GetSize() < resume_at)
202 deferred_resume.Schedule();
203
204 return nbytes;
205 }
206
207 void
CommitWriteBuffer(size_t nbytes)208 AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
209 {
210 buffer.Append(nbytes);
211
212 if (!IsReady())
213 SetReady();
214 else
215 InvokeOnAvailable();
216 }
217
218 void
AppendToBuffer(const void * data,size_t append_size)219 AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
220 {
221 auto w = buffer.Write();
222 assert(!w.empty());
223
224 size_t nbytes = std::min(w.size, append_size);
225 memcpy(w.data, data, nbytes);
226 buffer.Append(nbytes);
227
228 const size_t remaining = append_size - nbytes;
229 if (remaining > 0) {
230 w = buffer.Write();
231 assert(!w.empty());
232 assert(w.size >= remaining);
233
234 memcpy(w.data, (const uint8_t *)data + nbytes, remaining);
235 buffer.Append(remaining);
236 }
237
238 if (!IsReady())
239 SetReady();
240 else
241 InvokeOnAvailable();
242 }
243
244 void
DeferredResume()245 AsyncInputStream::DeferredResume() noexcept
246 {
247 const std::scoped_lock<Mutex> protect(mutex);
248
249 try {
250 Resume();
251 } catch (...) {
252 postponed_exception = std::current_exception();
253 InvokeOnAvailable();
254 }
255 }
256
257 void
DeferredSeek()258 AsyncInputStream::DeferredSeek() noexcept
259 {
260 const std::scoped_lock<Mutex> protect(mutex);
261 if (seek_state != SeekState::SCHEDULED)
262 return;
263
264 try {
265 Resume();
266
267 seek_state = SeekState::PENDING;
268 buffer.Clear();
269 paused = false;
270
271 DoSeek(seek_offset);
272 } catch (...) {
273 seek_state = SeekState::NONE;
274 postponed_exception = std::current_exception();
275 InvokeOnAvailable();
276 }
277 }
278