1 /*
2  * Copyright (C) 2000 Paul Davis & Benno Senoner
3  * Copyright (C) 2019 Robin Gareus <robin@gareus.org>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #ifndef playback_buffer_h
21 #define playback_buffer_h
22 
23 #include <cstring>
24 #include <stdint.h>
25 #include <glibmm.h>
26 
27 #include "pbd/libpbd_visibility.h"
28 #include "pbd/spinlock.h"
29 #include "pbd/g_atomic_compat.h"
30 
31 namespace PBD {
32 
33 template<class T>
34 class /*LIBPBD_API*/ PlaybackBuffer
35 {
36 public:
power_of_two_size(guint sz)37 	static guint power_of_two_size (guint sz) {
38 		int32_t power_of_two;
39 		for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two);
40 		return 1U << power_of_two;
41 	}
42 
43 	PlaybackBuffer (guint sz, guint res = 8191)
reservation(res)44 	: reservation (res)
45 	{
46 		sz += reservation;
47 		size = power_of_two_size (sz);
48 		size_mask = size - 1;
49 		buf = new T[size];
50 
51 		g_atomic_int_set (&read_idx, 0);
52 		reset ();
53 	}
54 
~PlaybackBuffer()55 	virtual ~PlaybackBuffer () {
56 		delete [] buf;
57 	}
58 
59 	/* init (mlock) */
buffer()60 	T *buffer () { return buf; }
61 	/* init (mlock) */
bufsize()62 	guint bufsize () const { return size; }
63 
64 	/* write-thread */
reset()65 	void reset () {
66 		/* writer, when seeking, may block */
67 		Glib::Threads::Mutex::Lock lm (_reset_lock);
68 		SpinLock sl (_reservation_lock);
69 		g_atomic_int_set (&read_idx, 0);
70 		g_atomic_int_set (&write_idx, 0);
71 		g_atomic_int_set (&reserved, 0);
72 	}
73 
74 	/* called from rt (reader) thread for new buffers */
align_to(PlaybackBuffer const & other)75 	void align_to (PlaybackBuffer const& other) {
76 		Glib::Threads::Mutex::Lock lm (_reset_lock);
77 		g_atomic_int_set (&read_idx, g_atomic_int_get (&other.read_idx));
78 		g_atomic_int_set (&write_idx, g_atomic_int_get (&other.write_idx));
79 		g_atomic_int_set (&reserved, g_atomic_int_get (&other.reserved));
80 		memset (buf, 0, size * sizeof (T));
81 	}
82 
83 	/* write-thread */
write_space()84 	guint write_space () const {
85 		guint w, r;
86 
87 		w = g_atomic_int_get (&write_idx);
88 		r = g_atomic_int_get (&read_idx);
89 
90 		guint rv;
91 
92 		if (w > r) {
93 			rv = ((r + size) - w) & size_mask;
94 		} else if (w < r) {
95 			rv = (r - w);
96 		} else {
97 			rv = size;
98 		}
99 		/* it may hapen that the read/invalidation-pointer moves backwards
100 		 * e.g. after rec-stop, declick fade-out.
101 		 * At the same time the butler may already have written data.
102 		 * (it's safe as long as the disk-reader does not move backwards by more
103 		 * than reservation)
104 		 * XXX disk-reading de-click should not move the invalidation-pointer
105 		 */
106 		if (rv > reservation) {
107 			return rv - 1 - reservation;
108 		}
109 		return 0;
110 	}
111 
112 	/* read-thread */
read_space()113 	guint read_space () const {
114 		guint w, r;
115 
116 		w = g_atomic_int_get (&write_idx);
117 		r = g_atomic_int_get (&read_idx);
118 
119 		if (w > r) {
120 			return w - r;
121 		} else {
122 			return ((w + size) - r) & size_mask;
123 		}
124 	}
125 
126 	/* write thread */
overwritable_at(guint r)127 	guint overwritable_at (guint r) const {
128 		guint w;
129 
130 		w = g_atomic_int_get (&write_idx);
131 
132 		if (w > r) {
133 			return w - r;
134 		}
135 		return (w - r + size) & size_mask;
136 	}
137 
138 	/* read-thead */
139 	guint read (T *dest, guint cnt, bool commit = true, guint offset = 0);
140 
141 	/* write-thead */
142 	guint write (T const * src, guint cnt);
143 	/* write-thead */
144 	guint write_zero (guint cnt);
145 	/* read-thead */
increment_write_ptr(guint cnt)146 	guint increment_write_ptr (guint cnt)
147 	{
148 		cnt = std::min (cnt, write_space ());
149 		g_atomic_int_set (&write_idx, (g_atomic_int_get (&write_idx) + cnt) & size_mask);
150 		return cnt;
151 	}
152 
153 	/* read-thead */
decrement_read_ptr(guint cnt)154 	guint decrement_read_ptr (guint cnt)
155 	{
156 		SpinLock sl (_reservation_lock);
157 		guint r = g_atomic_int_get (&read_idx);
158 		guint res = g_atomic_int_get (&reserved);
159 
160 		cnt = std::min (cnt, res);
161 
162 		r = (r + size - cnt) & size_mask;
163 		res -= cnt;
164 
165 		g_atomic_int_set (&read_idx, r);
166 		g_atomic_int_set (&reserved, res);
167 
168 		return cnt;
169 	}
170 
171 	/* read-thead */
increment_read_ptr(guint cnt)172 	guint increment_read_ptr (guint cnt)
173 	{
174 		cnt = std::min (cnt, read_space ());
175 
176 		SpinLock sl (_reservation_lock);
177 		g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
178 		g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + cnt));
179 
180 		return cnt;
181 	}
182 
183 	/* read-thead */
can_seek(int64_t cnt)184 	bool can_seek (int64_t cnt) {
185 		if (cnt > 0) {
186 			return read_space() >= cnt;
187 		} else if (cnt < 0) {
188 			return g_atomic_int_get (&reserved) >= -cnt;
189 		} else {
190 			return true;
191 		}
192 	}
193 
read_ptr()194 	guint read_ptr() const { return g_atomic_int_get (&read_idx); }
write_ptr()195 	guint write_ptr() const { return g_atomic_int_get (&write_idx); }
reserved_size()196 	guint reserved_size() const { return g_atomic_int_get (&reserved); }
reservation_size()197 	guint reservation_size() const { return reservation; }
198 
199 private:
200 	T *buf;
201 	const guint reservation;
202 	guint size;
203 	guint size_mask;
204 
205 	mutable GATOMIC_QUAL gint write_idx;
206 	mutable GATOMIC_QUAL gint read_idx;
207 	mutable GATOMIC_QUAL gint reserved;
208 
209 	/* spinlock will be used to update write_idx and reserved in sync */
210 	spinlock_t _reservation_lock;
211 	/* reset_lock is used to prevent concurrent reading and reset (seek, transport reversal etc). */
212 	Glib::Threads::Mutex _reset_lock;
213 };
214 
215 template<class T> /*LIBPBD_API*/ guint
write(T const * src,guint cnt)216 PlaybackBuffer<T>::write (T const *src, guint cnt)
217 {
218 	guint w = g_atomic_int_get (&write_idx);
219 	const guint free_cnt = write_space ();
220 
221 	if (free_cnt == 0) {
222 		return 0;
223 	}
224 
225 	const guint to_write = cnt > free_cnt ? free_cnt : cnt;
226 	const guint cnt2 = w + to_write;
227 
228 	guint n1, n2;
229 	if (cnt2 > size) {
230 		n1 = size - w;
231 		n2 = cnt2 & size_mask;
232 	} else {
233 		n1 = to_write;
234 		n2 = 0;
235 	}
236 
237 	memcpy (&buf[w], src, n1 * sizeof (T));
238 	w = (w + n1) & size_mask;
239 
240 	if (n2) {
241 		memcpy (buf, src+n1, n2 * sizeof (T));
242 		w = n2;
243 	}
244 
245 	g_atomic_int_set (&write_idx, w);
246 	return to_write;
247 }
248 
249 template<class T> /*LIBPBD_API*/ guint
write_zero(guint cnt)250 PlaybackBuffer<T>::write_zero (guint cnt)
251 {
252 	guint w = g_atomic_int_get (&write_idx);
253 	const guint free_cnt = write_space ();
254 
255 	if (free_cnt == 0) {
256 		return 0;
257 	}
258 
259 	const guint to_write = cnt > free_cnt ? free_cnt : cnt;
260 	const guint cnt2 = w + to_write;
261 
262 	guint n1, n2;
263 	if (cnt2 > size) {
264 		n1 = size - w;
265 		n2 = cnt2 & size_mask;
266 	} else {
267 		n1 = to_write;
268 		n2 = 0;
269 	}
270 
271 	memset (&buf[w], 0, n1 * sizeof (T));
272 	w = (w + n1) & size_mask;
273 
274 	if (n2) {
275 		memset (buf, 0, n2 * sizeof (T));
276 		w = n2;
277 	}
278 
279 	g_atomic_int_set (&write_idx, w);
280 	return to_write;
281 }
282 
283 template<class T> /*LIBPBD_API*/ guint
read(T * dest,guint cnt,bool commit,guint offset)284 PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
285 {
286 	Glib::Threads::Mutex::Lock lm (_reset_lock, Glib::Threads::TRY_LOCK);
287 	if (!lm.locked ()) {
288 		/* seek, reset in progress */
289 		return 0;
290 	}
291 
292 	guint r = g_atomic_int_get (&read_idx);
293 	const guint w = g_atomic_int_get (&write_idx);
294 
295 	guint free_cnt = (w > r) ? (w - r) : ((w - r + size) & size_mask);
296 
297 	if (!commit && offset > 0) {
298 		if (offset > free_cnt) {
299 			return 0;
300 		}
301 		free_cnt -= offset;
302 		r = (r + offset) & size_mask;
303 	}
304 
305 	const guint to_read = cnt > free_cnt ? free_cnt : cnt;
306 
307 	const guint cnt2 = r + to_read;
308 
309 	guint n1, n2;
310 	if (cnt2 > size) {
311 		n1 = size - r;
312 		n2 = cnt2 & size_mask;
313 	} else {
314 		n1 = to_read;
315 		n2 = 0;
316 	}
317 
318 	memcpy (dest, &buf[r], n1 * sizeof (T));
319 	r = (r + n1) & size_mask;
320 
321 	if (n2) {
322 		memcpy (dest + n1, buf, n2 * sizeof (T));
323 		r = n2;
324 	}
325 
326 	if (commit) {
327 		SpinLock sl (_reservation_lock);
328 		g_atomic_int_set (&read_idx, r);
329 		g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + to_read));
330 	}
331 	return to_read;
332 }
333 
334 } /* end namespace */
335 
336 #endif /* __ringbuffer_h__ */
337