1 #include "cado.h" // IWYU pragma: keep
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stddef.h>
6 #include <stdint.h>
7 #include "macros.h"
8 #include "ringbuf.h"
9 #include "portability.h"
10
11 /* This is a hack. Define to 1 to disable */
12 #define RINGBUF_ALIGNED_RETURNS sizeof(uint32_t)
13
14 /* Length of one write in preempt buffer. Between 64 and 1024 Ko
15 seems the best. */
16 #define PREEMPT_ONE_READ (1UL<<20)
17
18 /* This has been tested with small buffer sizes, so as to stress the
19 * pause/resume mechanism (based on pthread conditions). It seems to
20 * work. Note though that this code heavily relies on the fact that there
21 * is only _one_ thread reading data.
22 */
23
24 #ifndef MIN
25 #define MIN(l,o) ((l) < (o) ? (l) : (o))
26 #endif
27
28 /* refuse to fill memory with incoming data beyond this size. Incoming
29 * data which can only be stored by expanding the ringbuf capacity beyond
30 * this size are paused */
31 #define RINGBUF_MAX_SIZE (1 << 26)
32
33 /* This is only for ringbuf_get2, for the auxiliary malloc()'ed area
34 * which it returns. Note that this is not guided by efficiency needs. */
35 #define RINGBUF_READING_BUFFER_SIZE (1 << 16)
36
37 /* must be called with mutex locked !!! */
ringbuf_grow__(ringbuf_ptr r,size_t claim)38 static void ringbuf_grow__(ringbuf_ptr r, size_t claim)
39 {
40 size_t newalloc = r->alloc;
41 if (!claim) {
42 newalloc += newalloc / 2;
43 newalloc |= pagesize(); /* home-made wrapper */
44 newalloc++;
45 } else if (claim <= r->alloc) {
46 return;
47 } else {
48 newalloc = claim;
49 /* round to page size */
50 newalloc--;
51 newalloc |= pagesize();
52 newalloc++;
53 }
54
55 if (r->avail_to_read) {
56 char * newp = malloc(newalloc);
57 size_t tail = r->alloc - (r->rhead - r->p);
58 ptrdiff_t head = r->avail_to_read - tail;
59 // tail + head == r->avail_to_read
60 if (head > 0) {
61 memcpy(newp, r->rhead, tail);
62 memcpy(newp + tail, r->p, head);
63 } else {
64 memcpy(newp, r->rhead, r->avail_to_read);
65 }
66 free(r->p);
67 r->p = newp;
68 r->rhead = r->p;
69 r->whead = r->p + r->avail_to_read;
70 } else {
71 r->p = realloc(r->p, newalloc);
72 r->rhead = r->p;
73 r->whead = r->p;
74 }
75 r->avail_to_write += newalloc - r->alloc;
76 r->alloc = newalloc;
77 }
78
ringbuf_init(ringbuf_ptr r,size_t claim)79 void ringbuf_init(ringbuf_ptr r, size_t claim)
80 {
81 memset(r, 0, sizeof(ringbuf));
82 pthread_mutex_init(r->mx, NULL);
83 pthread_cond_init(r->bored, NULL);
84 if (claim) {
85 ringbuf_grow__(r, claim); /* mutex unneeded within init */
86 }
87 }
88
ringbuf_clear(ringbuf_ptr r)89 void ringbuf_clear(ringbuf_ptr r)
90 {
91 /*
92 // fprintf(stderr, "ringbuf: %d times full, %d times empty\n",
93 r->full_count, r->empty_count);
94 */
95 pthread_mutex_destroy(r->mx);
96 pthread_cond_destroy(r->bored);
97 free(r->rbuf);
98 free(r->p);
99 }
100
101
102
ringbuf_put(ringbuf_ptr r,char * p,size_t s)103 int ringbuf_put(ringbuf_ptr r, char * p, size_t s)
104 {
105 pthread_mutex_lock(r->mx);
106 // fprintf(stderr, "put(%zu): (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
107 for( ; s > r->avail_to_write ; ) {
108 if (r->alloc >= RINGBUF_MAX_SIZE) {
109 if (s < r->alloc) {
110 /* Then we want to drain our pipe first. */
111 // fprintf(stderr, "put(%zu): on hold (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
112 r->full_count++;
113 pthread_cond_wait(r->bored, r->mx);
114 // fprintf(stderr, "put(%zu): resuming (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
115 continue;
116 } else {
117 /* Here there is no hope to drain our pipe, so we must
118 * exceed our desired window size. This is not much of a
119 * problem though, since the curl reading buffer is not
120 * expected to grow too large (or so we hope...)
121 */
122 // fprintf(stderr, "Warning: buffer growing beyond max size ! (previous=%zu)\n", r->alloc);
123 }
124 }
125
126 ringbuf_grow__(r, 0);
127 }
128
129 size_t tail = r->alloc - (r->whead - r->p);
130 if (r->avail_to_write <= tail || s <= tail) {
131 ASSERT(s <= r->avail_to_write);
132 // s = MIN(r->avail_to_write, s);
133 ASSERT(s <= tail);
134 memcpy(r->whead, p, s);
135 r->whead += s;
136 } else {
137 ASSERT(tail > 0);
138 ASSERT(s > tail);
139 memcpy(r->whead, p, tail);
140 #ifndef NDEBUG
141 ptrdiff_t head = r->avail_to_write - tail;
142 ASSERT(head > 0);
143 ASSERT(s-tail <= (size_t) head);
144 #endif
145 // s = tail + MIN((size_t) head, s - tail);
146 memcpy(r->p, p + tail, s - tail);
147 r->whead = r->p + (s-tail);
148 }
149 if ((size_t) (r->whead - r->p) == r->alloc) {
150 r->whead = r->p;
151 }
152 r->avail_to_write -= s;
153 r->avail_to_read += s;
154 /* Could be that someone is waiting for data to be read */
155 pthread_cond_signal(r->bored);
156 pthread_mutex_unlock(r->mx);
157 return s;
158 }
159
ringbuf_mark_done(ringbuf_ptr r)160 void ringbuf_mark_done(ringbuf_ptr r)
161 {
162 pthread_mutex_lock(r->mx);
163 ASSERT(!r->done);
164 r->done = 1;
165 pthread_cond_broadcast(r->bored);
166 pthread_mutex_unlock(r->mx);
167 }
168
ringbuf_is_done(ringbuf_ptr r)169 int ringbuf_is_done(ringbuf_ptr r)
170 {
171 pthread_mutex_lock(r->mx);
172 int x = r->done;
173 pthread_mutex_unlock(r->mx);
174 return x;
175 }
176
ringbuf_get(ringbuf_ptr r,char * p,size_t s)177 int ringbuf_get(ringbuf_ptr r, char * p, size_t s)
178 {
179 pthread_mutex_lock(r->mx);
180 // fprintf(stderr, "get(%zu): (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
181 while (!r->done && r->avail_to_read < RINGBUF_ALIGNED_RETURNS) {
182 // fprintf(stderr, "get(%zu): on hold (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
183 r->empty_count++;
184 pthread_cond_wait(r->bored, r->mx);
185 // fprintf(stderr, "get(%zu): resumed (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
186 }
187 ASSERT(r->done || r->avail_to_read >= RINGBUF_ALIGNED_RETURNS);
188 if (r->done && !r->avail_to_read) {
189 pthread_mutex_unlock(r->mx);
190 return 0;
191 }
192 if (r->avail_to_read < RINGBUF_ALIGNED_RETURNS)
193 ASSERT(r->done);
194 size_t tail = r->alloc - (r->rhead - r->p);
195 ASSERT(s >= RINGBUF_ALIGNED_RETURNS);
196 s = MIN(r->avail_to_read, s);
197 if (s >= RINGBUF_ALIGNED_RETURNS) s -= s % RINGBUF_ALIGNED_RETURNS;
198 if (r->avail_to_read <= tail || s <= tail) {
199 ASSERT(s <= tail);
200 memcpy(p, r->rhead, s);
201 r->rhead += s;
202 } else {
203 ASSERT(tail > 0);
204 ASSERT(s > tail);
205 memcpy(p, r->rhead, tail);
206 #ifndef NDEBUG
207 ptrdiff_t head = r->avail_to_read - tail;
208 ASSERT(head > 0);
209 ASSERT((size_t) (s - tail) <= (size_t) head);
210 // s = tail + MIN((size_t) head, s - tail);
211 #endif
212 ASSERT(s >= tail);
213 memcpy(p + tail, r->p, s - tail);
214 r->rhead = r->p + (s-tail);
215 }
216 if ((size_t) (r->rhead - r->p) == r->alloc) {
217 r->rhead = r->p;
218 }
219 r->avail_to_read -= s;
220 r->avail_to_write += s;
221 /* Could be that someone is waiting for room to write data */
222 pthread_cond_signal(r->bored);
223 pthread_mutex_unlock(r->mx);
224 return s;
225 }
226
ringbuf_get2(ringbuf_ptr r,void ** p,size_t s)227 int ringbuf_get2(ringbuf_ptr r, void ** p, size_t s)
228 {
229 if (*p) {
230 return ringbuf_get(r, *p, s);
231 }
232 ASSERT(s == 0); // does not really make sense otherwise
233 pthread_mutex_lock(r->mx);
234 if (!r->rbuf) {
235 r->rbuf = malloc(RINGBUF_READING_BUFFER_SIZE);
236 }
237 pthread_mutex_unlock(r->mx);
238 /*
239 if (s > RINGBUF_READING_BUFFER_SIZE || s == 0)
240 s = RINGBUF_READING_BUFFER_SIZE;
241 */
242 *p = r->rbuf;
243 return ringbuf_get(r, r->rbuf, RINGBUF_READING_BUFFER_SIZE);
244 }
245
ringbuf_feed_stream(ringbuf_ptr r,FILE * f)246 ssize_t ringbuf_feed_stream(ringbuf_ptr r, FILE * f)
247 {
248 ssize_t nread = 0;
249
250 /* We are the only thread decreasing the avail_to_write counter in
251 * rb. So we may keep a copy of its value, which will always be a
252 * lower bound, provided that we accurately report our decreases both
253 * to our local value and to the global counter. */
254 pthread_mutex_lock(r->mx);
255 size_t local_w_avail = r->avail_to_write;
256 pthread_mutex_unlock(r->mx);
257
258 for( ; ; ) {
259 /* Make sure our writing space in the buffer is not empty */
260 if (local_w_avail == 0) {
261 pthread_mutex_lock(r->mx);
262 for( ; ! r->avail_to_write ; ) {
263 r->full_count++;
264 pthread_cond_wait(r->bored, r->mx);
265 }
266 local_w_avail = r->avail_to_write;
267 pthread_mutex_unlock(r->mx);
268 }
269 /* We may now fread() from f, but only up to the _contiguous_
270 * amount which is available in the buffer. This entails some
271 * intimate dialogue with the ringbuf internals, which
272 * obviously isn't cool (well, in fact, this whole thing
273 * could probably be considered within the ringbuf API, after
274 * all ?) */
275 /* We are the only thread likely to call ringbuf_grow__,
276 * which is the only (internal) call tinkering with r->p (and
277 * hence the validity of r->whead */
278 size_t tail = r->alloc - (r->whead - r->p);
279
280 tail = MIN(tail, local_w_avail);
281
282 /* restrict to reads of some maximum size, or we'll be too
283 * long delivering data to our customers */
284 tail = MIN(tail, PREEMPT_ONE_READ);
285
286 size_t s = fread(r->whead, 1, tail, f);
287 nread += s;
288
289 if (s) {
290 pthread_mutex_lock(r->mx);
291 r->whead += s;
292 r->avail_to_read += s;
293 local_w_avail = r->avail_to_write -= s;
294 if ((size_t) (r->whead - r->p) == r->alloc) {
295 r->whead = r->p;
296 }
297 /* Could be that someone is waiting for data to be read */
298 pthread_cond_signal(r->bored);
299 pthread_mutex_unlock(r->mx);
300 } else if (feof(f)) {
301 return nread;
302 } else {
303 return -1;
304 }
305 }
306 }
307
308 /* Search for character c, from position (offset) bytes into the readable
309 * part of the input buffer. Return the offset in bytes from the initial
310 * segment to the matching byte (thus an integer >= offset), or -1 if the
311 * byte could not be found.
312 *
313 * This might be called by the reader thread with the mutex unlocked,
314 * under the condition that the reading thread is unique.
315 */
ringbuf_strchr(ringbuf_ptr r,int c,size_t offset)316 int ringbuf_strchr(ringbuf_ptr r, int c, size_t offset)
317 {
318 pthread_mutex_lock(r->mx);
319 ASSERT_ALWAYS(offset <= r->avail_to_read);
320 pthread_mutex_unlock(r->mx);
321 size_t tail = r->alloc - (r->rhead - r->p);
322 int s = offset;
323 for(; (size_t) s < tail ; s++) {
324 if (r->rhead[s] == c)
325 return s;
326 }
327 tail = r->avail_to_read - s;
328 for(int t = 0 ; (size_t) t < tail ; s++,t++) {
329 if (r->p[t] == c)
330 return s;
331 }
332 return -1;
333 }
334
ringbuf_skip_get(ringbuf_ptr r,size_t s)335 int ringbuf_skip_get(ringbuf_ptr r, size_t s)
336 {
337 pthread_mutex_lock(r->mx);
338 size_t d = (r->rhead-r->p) + s;
339 if (d >= r->alloc) {
340 d -= r->alloc;
341 }
342 r->rhead = r->p + d;
343 r->avail_to_read -= s;
344 r->avail_to_write += s;
345 /* Could be that someone is waiting for room to write data */
346 pthread_cond_signal(r->bored);
347 pthread_mutex_unlock(r->mx);
348 return 0;
349 }
350