1 /*
2  * Copyright 2003-2021 The Music Player Daemon Project
3  * http://www.musicpd.org
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #include "BufferingInputStream.hxx"
21 #include "InputStream.hxx"
22 #include "thread/Name.hxx"
23 
24 #include <string.h>
25 
BufferingInputStream(InputStreamPtr _input)26 BufferingInputStream::BufferingInputStream(InputStreamPtr _input)
27 	:input(std::move(_input)),
28 	 mutex(input->mutex),
29 	 thread(BIND_THIS_METHOD(RunThread)),
30 	 buffer(input->GetSize())
31 {
32 	input->SetHandler(this);
33 
34 	thread.Start();
35 }
36 
~BufferingInputStream()37 BufferingInputStream::~BufferingInputStream() noexcept
38 {
39 	{
40 		const std::scoped_lock<Mutex> lock(mutex);
41 		stop = true;
42 		wake_cond.notify_one();
43 	}
44 
45 	thread.Join();
46 }
47 
48 void
Check()49 BufferingInputStream::Check()
50 {
51 	if (error)
52 		std::rethrow_exception(error);
53 
54 	if (input)
55 		input->Check();
56 }
57 
58 bool
IsAvailable(size_t offset) const59 BufferingInputStream::IsAvailable(size_t offset) const noexcept
60 {
61 	if (offset >= size() || error)
62 		return true;
63 
64 	if (buffer.Read(offset).HasData())
65 		return true;
66 
67 	/* if no data is available now, make sure it will be soon */
68 	if (want_offset == INVALID_OFFSET)
69 		want_offset = offset;
70 
71 	return false;
72 }
73 
74 size_t
Read(std::unique_lock<Mutex> & lock,size_t offset,void * ptr,size_t s)75 BufferingInputStream::Read(std::unique_lock<Mutex> &lock, size_t offset,
76 			   void *ptr, size_t s)
77 {
78 	if (offset >= size())
79 		return 0;
80 
81 	while (true) {
82 		auto r = buffer.Read(offset);
83 		if (r.HasData()) {
84 			/* yay, we have some data */
85 			size_t nbytes = std::min(s, r.defined_buffer.size);
86 			memcpy(ptr, r.defined_buffer.data, nbytes);
87 			return nbytes;
88 		}
89 
90 		if (error)
91 			std::rethrow_exception(error);
92 
93 		if (want_offset == INVALID_OFFSET)
94 			want_offset = offset;
95 
96 		client_cond.wait(lock);
97 	}
98 }
99 
100 size_t
FindFirstHole() const101 BufferingInputStream::FindFirstHole() const noexcept
102 {
103 	auto r = buffer.Read(0);
104 	if (r.undefined_size > 0)
105 		/* a hole at the beginning */
106 		return 0;
107 
108 	if (r.defined_buffer.size < size())
109 		/* a hole in the middle */
110 		return r.defined_buffer.size;
111 
112 	/* the file has been read completely */
113 	return INVALID_OFFSET;
114 }
115 
116 inline void
RunThreadLocked(std::unique_lock<Mutex> & lock)117 BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock)
118 {
119 	while (!stop) {
120 		if (want_offset != INVALID_OFFSET) {
121 			assert(want_offset < size());
122 
123 			const size_t seek_offset = want_offset;
124 			want_offset = INVALID_OFFSET;
125 			if (!buffer.Read(seek_offset).HasData())
126 				input->Seek(lock, seek_offset);
127 		} else if (input->IsEOF()) {
128 			/* our input has reached its end: prepare
129 			   reading the first remaining hole */
130 
131 			size_t new_offset = FindFirstHole();
132 			if (new_offset == INVALID_OFFSET) {
133 				/* the file has been read completely */
134 				break;
135 			}
136 
137 			/* seek to the first hole */
138 			input->Seek(lock, new_offset);
139 		} else if (input->IsAvailable()) {
140 			const auto read_offset = input->GetOffset();
141 			auto w = buffer.Write(read_offset);
142 
143 			if (w.empty()) {
144 				size_t new_offset = FindFirstHole();
145 				if (new_offset == INVALID_OFFSET)
146 					/* the file has been read
147 					   completely */
148 					break;
149 
150 				input->Seek(lock, new_offset);
151 
152 				continue;
153 			}
154 
155 			/* enforce an upper limit for each
156 			   InputStream::Read() call; this is necessary
157 			   for plugins which are unable to do partial
158 			   reads, e.g. when reading local files, the
159 			   read() system call will not return until
160 			   all requested bytes have been read from the
161 			   hard disk, instead of returning when "some"
162 			   data has been read */
163 			constexpr size_t MAX_READ = 64 * 1024;
164 			if (w.size > MAX_READ)
165 				w.size = MAX_READ;
166 
167 			size_t nbytes = input->Read(lock, w.data, w.size);
168 			buffer.Commit(read_offset, read_offset + nbytes);
169 
170 			client_cond.notify_all();
171 			OnBufferAvailable();
172 		} else
173 			wake_cond.wait(lock);
174 	}
175 }
176 
177 void
RunThread()178 BufferingInputStream::RunThread() noexcept
179 {
180 	SetThreadName("buffering");
181 
182 	std::unique_lock<Mutex> lock(mutex);
183 
184 	try {
185 		RunThreadLocked(lock);
186 	} catch (...) {
187 		error = std::current_exception();
188 		client_cond.notify_all();
189 		OnBufferAvailable();
190 	}
191 
192 	/* clear the "input" attribute while holding the mutex */
193 	auto _input = std::move(input);
194 
195 	/* the mutex must be unlocked while an InputStream can be
196 	   destructed */
197 	lock.unlock();
198 
199 	/* and now actually destruct the InputStream */
200 	_input.reset();
201 }
202