1 /*
2  * Copyright 2003-2021 The Music Player Daemon Project
3  * http://www.musicpd.org
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #include "NfsInputPlugin.hxx"
21 #include "../AsyncInputStream.hxx"
22 #include "../InputPlugin.hxx"
23 #include "lib/nfs/Glue.hxx"
24 #include "lib/nfs/FileReader.hxx"
25 
26 /**
27  * Do not buffer more than this number of bytes.  It should be a
28  * reasonable limit that doesn't make low-end machines suffer too
29  * much, but doesn't cause stuttering on high-latency lines.
30  */
31 static const size_t NFS_MAX_BUFFERED = 512 * 1024;
32 
33 /**
34  * Resume the stream at this number of bytes after it has been paused.
35  */
36 static const size_t NFS_RESUME_AT = 384 * 1024;
37 
38 class NfsInputStream final : NfsFileReader, public AsyncInputStream {
39 	uint64_t next_offset;
40 
41 	bool reconnect_on_resume = false, reconnecting = false;
42 
43 public:
NfsInputStream(const char * _uri,Mutex & _mutex)44 	NfsInputStream(const char *_uri, Mutex &_mutex)
45 		:AsyncInputStream(NfsFileReader::GetEventLoop(),
46 				  _uri, _mutex,
47 				  NFS_MAX_BUFFERED,
48 				  NFS_RESUME_AT) {}
49 
~NfsInputStream()50 	~NfsInputStream() override {
51 		DeferClose();
52 	}
53 
54 	NfsInputStream(const NfsInputStream &) = delete;
55 	NfsInputStream &operator=(const NfsInputStream &) = delete;
56 
Open()57 	void Open() {
58 		assert(!IsReady());
59 
60 		NfsFileReader::Open(GetURI());
61 	}
62 
63 private:
64 	void DoRead();
65 
66 protected:
67 	/* virtual methods from AsyncInputStream */
68 	void DoResume() override;
69 	void DoSeek(offset_type new_offset) override;
70 
71 private:
72 	/* virtual methods from NfsFileReader */
73 	void OnNfsFileOpen(uint64_t size) noexcept override;
74 	void OnNfsFileRead(const void *data, size_t size) noexcept override;
75 	void OnNfsFileError(std::exception_ptr &&e) noexcept override;
76 };
77 
78 void
DoRead()79 NfsInputStream::DoRead()
80 {
81 	assert(NfsFileReader::IsIdle());
82 
83 	int64_t remaining = size - next_offset;
84 	if (remaining <= 0)
85 		return;
86 
87 	const size_t buffer_space = GetBufferSpace();
88 	if (buffer_space == 0) {
89 		Pause();
90 		return;
91 	}
92 
93 	size_t nbytes = std::min<size_t>(std::min<uint64_t>(remaining, 32768),
94 					 buffer_space);
95 
96 	try {
97 		const ScopeUnlock unlock(mutex);
98 		NfsFileReader::Read(next_offset, nbytes);
99 	} catch (...) {
100 		postponed_exception = std::current_exception();
101 		InvokeOnAvailable();
102 	}
103 }
104 
105 void
DoResume()106 NfsInputStream::DoResume()
107 {
108 	if (reconnect_on_resume) {
109 		/* the NFS connection has died while this stream was
110 		   "paused" - attempt to reconnect */
111 
112 		reconnect_on_resume = false;
113 		reconnecting = true;
114 
115 		ScopeUnlock unlock(mutex);
116 
117 		NfsFileReader::Close();
118 		NfsFileReader::Open(GetURI());
119 
120 		return;
121 	}
122 
123 	assert(NfsFileReader::IsIdle());
124 
125 	DoRead();
126 }
127 
128 void
DoSeek(offset_type new_offset)129 NfsInputStream::DoSeek(offset_type new_offset)
130 {
131 	{
132 		const ScopeUnlock unlock(mutex);
133 		NfsFileReader::CancelRead();
134 	}
135 
136 	next_offset = offset = new_offset;
137 	SeekDone();
138 	DoRead();
139 }
140 
141 void
OnNfsFileOpen(uint64_t _size)142 NfsInputStream::OnNfsFileOpen(uint64_t _size) noexcept
143 {
144 	const std::scoped_lock<Mutex> protect(mutex);
145 
146 	if (reconnecting) {
147 		/* reconnect has succeeded */
148 
149 		reconnecting = false;
150 		DoRead();
151 		return;
152 	}
153 
154 	size = _size;
155 	seekable = true;
156 	next_offset = 0;
157 	SetReady();
158 	DoRead();
159 }
160 
161 void
OnNfsFileRead(const void * data,size_t data_size)162 NfsInputStream::OnNfsFileRead(const void *data, size_t data_size) noexcept
163 {
164 	const std::scoped_lock<Mutex> protect(mutex);
165 	assert(!IsBufferFull());
166 	assert(IsBufferFull() == (GetBufferSpace() == 0));
167 	AppendToBuffer(data, data_size);
168 
169 	next_offset += data_size;
170 
171 	DoRead();
172 }
173 
174 void
OnNfsFileError(std::exception_ptr && e)175 NfsInputStream::OnNfsFileError(std::exception_ptr &&e) noexcept
176 {
177 	const std::scoped_lock<Mutex> protect(mutex);
178 
179 	if (IsPaused()) {
180 		/* while we're paused, don't report this error to the
181 		   client just yet (it might just be timeout, maybe
182 		   playback has been paused for quite some time) -
183 		   wait until the stream gets resumed and try to
184 		   reconnect, to give it another chance */
185 
186 		reconnect_on_resume = true;
187 		return;
188 	}
189 
190 	postponed_exception = std::move(e);
191 
192 	if (IsSeekPending())
193 		SeekDone();
194 	else if (!IsReady())
195 		SetReady();
196 	else
197 		InvokeOnAvailable();
198 }
199 
200 /*
201  * InputPlugin methods
202  *
203  */
204 
205 static void
input_nfs_init(EventLoop & event_loop,const ConfigBlock &)206 input_nfs_init(EventLoop &event_loop, const ConfigBlock &)
207 {
208 	nfs_init(event_loop);
209 }
210 
211 static void
input_nfs_finish()212 input_nfs_finish() noexcept
213 {
214 	nfs_finish();
215 }
216 
217 static InputStreamPtr
input_nfs_open(const char * uri,Mutex & mutex)218 input_nfs_open(const char *uri,
219 	       Mutex &mutex)
220 {
221 	auto is = std::make_unique<NfsInputStream>(uri, mutex);
222 	is->Open();
223 	return is;
224 }
225 
226 static constexpr const char *nfs_prefixes[] = {
227 	"nfs://",
228 	nullptr
229 };
230 
231 const InputPlugin input_plugin_nfs = {
232 	"nfs",
233 	nfs_prefixes,
234 	input_nfs_init,
235 	input_nfs_finish,
236 	input_nfs_open,
237 	nullptr
238 };
239