1 /*
2 * Copyright 2003-2021 The Music Player Daemon Project
3 * http://www.musicpd.org
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "NfsInputPlugin.hxx"
21 #include "../AsyncInputStream.hxx"
22 #include "../InputPlugin.hxx"
23 #include "lib/nfs/Glue.hxx"
24 #include "lib/nfs/FileReader.hxx"
25
26 /**
27 * Do not buffer more than this number of bytes. It should be a
28 * reasonable limit that doesn't make low-end machines suffer too
29 * much, but doesn't cause stuttering on high-latency lines.
30 */
31 static const size_t NFS_MAX_BUFFERED = 512 * 1024;
32
33 /**
34 * Resume the stream at this number of bytes after it has been paused.
35 */
36 static const size_t NFS_RESUME_AT = 384 * 1024;
37
38 class NfsInputStream final : NfsFileReader, public AsyncInputStream {
39 uint64_t next_offset;
40
41 bool reconnect_on_resume = false, reconnecting = false;
42
43 public:
NfsInputStream(const char * _uri,Mutex & _mutex)44 NfsInputStream(const char *_uri, Mutex &_mutex)
45 :AsyncInputStream(NfsFileReader::GetEventLoop(),
46 _uri, _mutex,
47 NFS_MAX_BUFFERED,
48 NFS_RESUME_AT) {}
49
~NfsInputStream()50 ~NfsInputStream() override {
51 DeferClose();
52 }
53
54 NfsInputStream(const NfsInputStream &) = delete;
55 NfsInputStream &operator=(const NfsInputStream &) = delete;
56
Open()57 void Open() {
58 assert(!IsReady());
59
60 NfsFileReader::Open(GetURI());
61 }
62
63 private:
64 void DoRead();
65
66 protected:
67 /* virtual methods from AsyncInputStream */
68 void DoResume() override;
69 void DoSeek(offset_type new_offset) override;
70
71 private:
72 /* virtual methods from NfsFileReader */
73 void OnNfsFileOpen(uint64_t size) noexcept override;
74 void OnNfsFileRead(const void *data, size_t size) noexcept override;
75 void OnNfsFileError(std::exception_ptr &&e) noexcept override;
76 };
77
78 void
DoRead()79 NfsInputStream::DoRead()
80 {
81 assert(NfsFileReader::IsIdle());
82
83 int64_t remaining = size - next_offset;
84 if (remaining <= 0)
85 return;
86
87 const size_t buffer_space = GetBufferSpace();
88 if (buffer_space == 0) {
89 Pause();
90 return;
91 }
92
93 size_t nbytes = std::min<size_t>(std::min<uint64_t>(remaining, 32768),
94 buffer_space);
95
96 try {
97 const ScopeUnlock unlock(mutex);
98 NfsFileReader::Read(next_offset, nbytes);
99 } catch (...) {
100 postponed_exception = std::current_exception();
101 InvokeOnAvailable();
102 }
103 }
104
105 void
DoResume()106 NfsInputStream::DoResume()
107 {
108 if (reconnect_on_resume) {
109 /* the NFS connection has died while this stream was
110 "paused" - attempt to reconnect */
111
112 reconnect_on_resume = false;
113 reconnecting = true;
114
115 ScopeUnlock unlock(mutex);
116
117 NfsFileReader::Close();
118 NfsFileReader::Open(GetURI());
119
120 return;
121 }
122
123 assert(NfsFileReader::IsIdle());
124
125 DoRead();
126 }
127
128 void
DoSeek(offset_type new_offset)129 NfsInputStream::DoSeek(offset_type new_offset)
130 {
131 {
132 const ScopeUnlock unlock(mutex);
133 NfsFileReader::CancelRead();
134 }
135
136 next_offset = offset = new_offset;
137 SeekDone();
138 DoRead();
139 }
140
141 void
OnNfsFileOpen(uint64_t _size)142 NfsInputStream::OnNfsFileOpen(uint64_t _size) noexcept
143 {
144 const std::scoped_lock<Mutex> protect(mutex);
145
146 if (reconnecting) {
147 /* reconnect has succeeded */
148
149 reconnecting = false;
150 DoRead();
151 return;
152 }
153
154 size = _size;
155 seekable = true;
156 next_offset = 0;
157 SetReady();
158 DoRead();
159 }
160
161 void
OnNfsFileRead(const void * data,size_t data_size)162 NfsInputStream::OnNfsFileRead(const void *data, size_t data_size) noexcept
163 {
164 const std::scoped_lock<Mutex> protect(mutex);
165 assert(!IsBufferFull());
166 assert(IsBufferFull() == (GetBufferSpace() == 0));
167 AppendToBuffer(data, data_size);
168
169 next_offset += data_size;
170
171 DoRead();
172 }
173
174 void
OnNfsFileError(std::exception_ptr && e)175 NfsInputStream::OnNfsFileError(std::exception_ptr &&e) noexcept
176 {
177 const std::scoped_lock<Mutex> protect(mutex);
178
179 if (IsPaused()) {
180 /* while we're paused, don't report this error to the
181 client just yet (it might just be timeout, maybe
182 playback has been paused for quite some time) -
183 wait until the stream gets resumed and try to
184 reconnect, to give it another chance */
185
186 reconnect_on_resume = true;
187 return;
188 }
189
190 postponed_exception = std::move(e);
191
192 if (IsSeekPending())
193 SeekDone();
194 else if (!IsReady())
195 SetReady();
196 else
197 InvokeOnAvailable();
198 }
199
200 /*
201 * InputPlugin methods
202 *
203 */
204
205 static void
input_nfs_init(EventLoop & event_loop,const ConfigBlock &)206 input_nfs_init(EventLoop &event_loop, const ConfigBlock &)
207 {
208 nfs_init(event_loop);
209 }
210
211 static void
input_nfs_finish()212 input_nfs_finish() noexcept
213 {
214 nfs_finish();
215 }
216
217 static InputStreamPtr
input_nfs_open(const char * uri,Mutex & mutex)218 input_nfs_open(const char *uri,
219 Mutex &mutex)
220 {
221 auto is = std::make_unique<NfsInputStream>(uri, mutex);
222 is->Open();
223 return is;
224 }
225
226 static constexpr const char *nfs_prefixes[] = {
227 "nfs://",
228 nullptr
229 };
230
231 const InputPlugin input_plugin_nfs = {
232 "nfs",
233 nfs_prefixes,
234 input_nfs_init,
235 input_nfs_finish,
236 input_nfs_open,
237 nullptr
238 };
239