1 /*
2 * Copyright 2003-2021 The Music Player Daemon Project
3 * http://www.musicpd.org
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "BufferingInputStream.hxx"
21 #include "InputStream.hxx"
22 #include "thread/Name.hxx"
23
24 #include <string.h>
25
BufferingInputStream(InputStreamPtr _input)26 BufferingInputStream::BufferingInputStream(InputStreamPtr _input)
27 :input(std::move(_input)),
28 mutex(input->mutex),
29 thread(BIND_THIS_METHOD(RunThread)),
30 buffer(input->GetSize())
31 {
32 input->SetHandler(this);
33
34 thread.Start();
35 }
36
~BufferingInputStream()37 BufferingInputStream::~BufferingInputStream() noexcept
38 {
39 {
40 const std::scoped_lock<Mutex> lock(mutex);
41 stop = true;
42 wake_cond.notify_one();
43 }
44
45 thread.Join();
46 }
47
48 void
Check()49 BufferingInputStream::Check()
50 {
51 if (error)
52 std::rethrow_exception(error);
53
54 if (input)
55 input->Check();
56 }
57
58 bool
IsAvailable(size_t offset) const59 BufferingInputStream::IsAvailable(size_t offset) const noexcept
60 {
61 if (offset >= size() || error)
62 return true;
63
64 if (buffer.Read(offset).HasData())
65 return true;
66
67 /* if no data is available now, make sure it will be soon */
68 if (want_offset == INVALID_OFFSET)
69 want_offset = offset;
70
71 return false;
72 }
73
74 size_t
Read(std::unique_lock<Mutex> & lock,size_t offset,void * ptr,size_t s)75 BufferingInputStream::Read(std::unique_lock<Mutex> &lock, size_t offset,
76 void *ptr, size_t s)
77 {
78 if (offset >= size())
79 return 0;
80
81 while (true) {
82 auto r = buffer.Read(offset);
83 if (r.HasData()) {
84 /* yay, we have some data */
85 size_t nbytes = std::min(s, r.defined_buffer.size);
86 memcpy(ptr, r.defined_buffer.data, nbytes);
87 return nbytes;
88 }
89
90 if (error)
91 std::rethrow_exception(error);
92
93 if (want_offset == INVALID_OFFSET)
94 want_offset = offset;
95
96 client_cond.wait(lock);
97 }
98 }
99
100 size_t
FindFirstHole() const101 BufferingInputStream::FindFirstHole() const noexcept
102 {
103 auto r = buffer.Read(0);
104 if (r.undefined_size > 0)
105 /* a hole at the beginning */
106 return 0;
107
108 if (r.defined_buffer.size < size())
109 /* a hole in the middle */
110 return r.defined_buffer.size;
111
112 /* the file has been read completely */
113 return INVALID_OFFSET;
114 }
115
116 inline void
RunThreadLocked(std::unique_lock<Mutex> & lock)117 BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock)
118 {
119 while (!stop) {
120 if (want_offset != INVALID_OFFSET) {
121 assert(want_offset < size());
122
123 const size_t seek_offset = want_offset;
124 want_offset = INVALID_OFFSET;
125 if (!buffer.Read(seek_offset).HasData())
126 input->Seek(lock, seek_offset);
127 } else if (input->IsEOF()) {
128 /* our input has reached its end: prepare
129 reading the first remaining hole */
130
131 size_t new_offset = FindFirstHole();
132 if (new_offset == INVALID_OFFSET) {
133 /* the file has been read completely */
134 break;
135 }
136
137 /* seek to the first hole */
138 input->Seek(lock, new_offset);
139 } else if (input->IsAvailable()) {
140 const auto read_offset = input->GetOffset();
141 auto w = buffer.Write(read_offset);
142
143 if (w.empty()) {
144 size_t new_offset = FindFirstHole();
145 if (new_offset == INVALID_OFFSET)
146 /* the file has been read
147 completely */
148 break;
149
150 input->Seek(lock, new_offset);
151
152 continue;
153 }
154
155 /* enforce an upper limit for each
156 InputStream::Read() call; this is necessary
157 for plugins which are unable to do partial
158 reads, e.g. when reading local files, the
159 read() system call will not return until
160 all requested bytes have been read from the
161 hard disk, instead of returning when "some"
162 data has been read */
163 constexpr size_t MAX_READ = 64 * 1024;
164 if (w.size > MAX_READ)
165 w.size = MAX_READ;
166
167 size_t nbytes = input->Read(lock, w.data, w.size);
168 buffer.Commit(read_offset, read_offset + nbytes);
169
170 client_cond.notify_all();
171 OnBufferAvailable();
172 } else
173 wake_cond.wait(lock);
174 }
175 }
176
177 void
RunThread()178 BufferingInputStream::RunThread() noexcept
179 {
180 SetThreadName("buffering");
181
182 std::unique_lock<Mutex> lock(mutex);
183
184 try {
185 RunThreadLocked(lock);
186 } catch (...) {
187 error = std::current_exception();
188 client_cond.notify_all();
189 OnBufferAvailable();
190 }
191
192 /* clear the "input" attribute while holding the mutex */
193 auto _input = std::move(input);
194
195 /* the mutex must be unlocked while an InputStream can be
196 destructed */
197 lock.unlock();
198
199 /* and now actually destruct the InputStream */
200 _input.reset();
201 }
202