1 /*
2 * Copyright (C) 2000 Paul Davis & Benno Senoner
3 * Copyright (C) 2019 Robin Gareus <robin@gareus.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifndef playback_buffer_h
21 #define playback_buffer_h
22
23 #include <cstring>
24 #include <stdint.h>
25 #include <glibmm.h>
26
27 #include "pbd/libpbd_visibility.h"
28 #include "pbd/spinlock.h"
29 #include "pbd/g_atomic_compat.h"
30
31 namespace PBD {
32
33 template<class T>
34 class /*LIBPBD_API*/ PlaybackBuffer
35 {
36 public:
power_of_two_size(guint sz)37 static guint power_of_two_size (guint sz) {
38 int32_t power_of_two;
39 for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two);
40 return 1U << power_of_two;
41 }
42
43 PlaybackBuffer (guint sz, guint res = 8191)
reservation(res)44 : reservation (res)
45 {
46 sz += reservation;
47 size = power_of_two_size (sz);
48 size_mask = size - 1;
49 buf = new T[size];
50
51 g_atomic_int_set (&read_idx, 0);
52 reset ();
53 }
54
~PlaybackBuffer()55 virtual ~PlaybackBuffer () {
56 delete [] buf;
57 }
58
59 /* init (mlock) */
buffer()60 T *buffer () { return buf; }
61 /* init (mlock) */
bufsize()62 guint bufsize () const { return size; }
63
64 /* write-thread */
reset()65 void reset () {
66 /* writer, when seeking, may block */
67 Glib::Threads::Mutex::Lock lm (_reset_lock);
68 SpinLock sl (_reservation_lock);
69 g_atomic_int_set (&read_idx, 0);
70 g_atomic_int_set (&write_idx, 0);
71 g_atomic_int_set (&reserved, 0);
72 }
73
74 /* called from rt (reader) thread for new buffers */
align_to(PlaybackBuffer const & other)75 void align_to (PlaybackBuffer const& other) {
76 Glib::Threads::Mutex::Lock lm (_reset_lock);
77 g_atomic_int_set (&read_idx, g_atomic_int_get (&other.read_idx));
78 g_atomic_int_set (&write_idx, g_atomic_int_get (&other.write_idx));
79 g_atomic_int_set (&reserved, g_atomic_int_get (&other.reserved));
80 memset (buf, 0, size * sizeof (T));
81 }
82
83 /* write-thread */
write_space()84 guint write_space () const {
85 guint w, r;
86
87 w = g_atomic_int_get (&write_idx);
88 r = g_atomic_int_get (&read_idx);
89
90 guint rv;
91
92 if (w > r) {
93 rv = ((r + size) - w) & size_mask;
94 } else if (w < r) {
95 rv = (r - w);
96 } else {
97 rv = size;
98 }
99 /* it may hapen that the read/invalidation-pointer moves backwards
100 * e.g. after rec-stop, declick fade-out.
101 * At the same time the butler may already have written data.
102 * (it's safe as long as the disk-reader does not move backwards by more
103 * than reservation)
104 * XXX disk-reading de-click should not move the invalidation-pointer
105 */
106 if (rv > reservation) {
107 return rv - 1 - reservation;
108 }
109 return 0;
110 }
111
112 /* read-thread */
read_space()113 guint read_space () const {
114 guint w, r;
115
116 w = g_atomic_int_get (&write_idx);
117 r = g_atomic_int_get (&read_idx);
118
119 if (w > r) {
120 return w - r;
121 } else {
122 return ((w + size) - r) & size_mask;
123 }
124 }
125
126 /* write thread */
overwritable_at(guint r)127 guint overwritable_at (guint r) const {
128 guint w;
129
130 w = g_atomic_int_get (&write_idx);
131
132 if (w > r) {
133 return w - r;
134 }
135 return (w - r + size) & size_mask;
136 }
137
138 /* read-thead */
139 guint read (T *dest, guint cnt, bool commit = true, guint offset = 0);
140
141 /* write-thead */
142 guint write (T const * src, guint cnt);
143 /* write-thead */
144 guint write_zero (guint cnt);
145 /* read-thead */
increment_write_ptr(guint cnt)146 guint increment_write_ptr (guint cnt)
147 {
148 cnt = std::min (cnt, write_space ());
149 g_atomic_int_set (&write_idx, (g_atomic_int_get (&write_idx) + cnt) & size_mask);
150 return cnt;
151 }
152
153 /* read-thead */
decrement_read_ptr(guint cnt)154 guint decrement_read_ptr (guint cnt)
155 {
156 SpinLock sl (_reservation_lock);
157 guint r = g_atomic_int_get (&read_idx);
158 guint res = g_atomic_int_get (&reserved);
159
160 cnt = std::min (cnt, res);
161
162 r = (r + size - cnt) & size_mask;
163 res -= cnt;
164
165 g_atomic_int_set (&read_idx, r);
166 g_atomic_int_set (&reserved, res);
167
168 return cnt;
169 }
170
171 /* read-thead */
increment_read_ptr(guint cnt)172 guint increment_read_ptr (guint cnt)
173 {
174 cnt = std::min (cnt, read_space ());
175
176 SpinLock sl (_reservation_lock);
177 g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
178 g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + cnt));
179
180 return cnt;
181 }
182
183 /* read-thead */
can_seek(int64_t cnt)184 bool can_seek (int64_t cnt) {
185 if (cnt > 0) {
186 return read_space() >= cnt;
187 } else if (cnt < 0) {
188 return g_atomic_int_get (&reserved) >= -cnt;
189 } else {
190 return true;
191 }
192 }
193
read_ptr()194 guint read_ptr() const { return g_atomic_int_get (&read_idx); }
write_ptr()195 guint write_ptr() const { return g_atomic_int_get (&write_idx); }
reserved_size()196 guint reserved_size() const { return g_atomic_int_get (&reserved); }
reservation_size()197 guint reservation_size() const { return reservation; }
198
199 private:
200 T *buf;
201 const guint reservation;
202 guint size;
203 guint size_mask;
204
205 mutable GATOMIC_QUAL gint write_idx;
206 mutable GATOMIC_QUAL gint read_idx;
207 mutable GATOMIC_QUAL gint reserved;
208
209 /* spinlock will be used to update write_idx and reserved in sync */
210 spinlock_t _reservation_lock;
211 /* reset_lock is used to prevent concurrent reading and reset (seek, transport reversal etc). */
212 Glib::Threads::Mutex _reset_lock;
213 };
214
215 template<class T> /*LIBPBD_API*/ guint
write(T const * src,guint cnt)216 PlaybackBuffer<T>::write (T const *src, guint cnt)
217 {
218 guint w = g_atomic_int_get (&write_idx);
219 const guint free_cnt = write_space ();
220
221 if (free_cnt == 0) {
222 return 0;
223 }
224
225 const guint to_write = cnt > free_cnt ? free_cnt : cnt;
226 const guint cnt2 = w + to_write;
227
228 guint n1, n2;
229 if (cnt2 > size) {
230 n1 = size - w;
231 n2 = cnt2 & size_mask;
232 } else {
233 n1 = to_write;
234 n2 = 0;
235 }
236
237 memcpy (&buf[w], src, n1 * sizeof (T));
238 w = (w + n1) & size_mask;
239
240 if (n2) {
241 memcpy (buf, src+n1, n2 * sizeof (T));
242 w = n2;
243 }
244
245 g_atomic_int_set (&write_idx, w);
246 return to_write;
247 }
248
249 template<class T> /*LIBPBD_API*/ guint
write_zero(guint cnt)250 PlaybackBuffer<T>::write_zero (guint cnt)
251 {
252 guint w = g_atomic_int_get (&write_idx);
253 const guint free_cnt = write_space ();
254
255 if (free_cnt == 0) {
256 return 0;
257 }
258
259 const guint to_write = cnt > free_cnt ? free_cnt : cnt;
260 const guint cnt2 = w + to_write;
261
262 guint n1, n2;
263 if (cnt2 > size) {
264 n1 = size - w;
265 n2 = cnt2 & size_mask;
266 } else {
267 n1 = to_write;
268 n2 = 0;
269 }
270
271 memset (&buf[w], 0, n1 * sizeof (T));
272 w = (w + n1) & size_mask;
273
274 if (n2) {
275 memset (buf, 0, n2 * sizeof (T));
276 w = n2;
277 }
278
279 g_atomic_int_set (&write_idx, w);
280 return to_write;
281 }
282
283 template<class T> /*LIBPBD_API*/ guint
read(T * dest,guint cnt,bool commit,guint offset)284 PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
285 {
286 Glib::Threads::Mutex::Lock lm (_reset_lock, Glib::Threads::TRY_LOCK);
287 if (!lm.locked ()) {
288 /* seek, reset in progress */
289 return 0;
290 }
291
292 guint r = g_atomic_int_get (&read_idx);
293 const guint w = g_atomic_int_get (&write_idx);
294
295 guint free_cnt = (w > r) ? (w - r) : ((w - r + size) & size_mask);
296
297 if (!commit && offset > 0) {
298 if (offset > free_cnt) {
299 return 0;
300 }
301 free_cnt -= offset;
302 r = (r + offset) & size_mask;
303 }
304
305 const guint to_read = cnt > free_cnt ? free_cnt : cnt;
306
307 const guint cnt2 = r + to_read;
308
309 guint n1, n2;
310 if (cnt2 > size) {
311 n1 = size - r;
312 n2 = cnt2 & size_mask;
313 } else {
314 n1 = to_read;
315 n2 = 0;
316 }
317
318 memcpy (dest, &buf[r], n1 * sizeof (T));
319 r = (r + n1) & size_mask;
320
321 if (n2) {
322 memcpy (dest + n1, buf, n2 * sizeof (T));
323 r = n2;
324 }
325
326 if (commit) {
327 SpinLock sl (_reservation_lock);
328 g_atomic_int_set (&read_idx, r);
329 g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + to_read));
330 }
331 return to_read;
332 }
333
334 } /* end namespace */
335
336 #endif /* __ringbuffer_h__ */
337