1 #include "cado.h" // IWYU pragma: keep
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stddef.h>
6 #include <stdint.h>
7 #include "macros.h"
8 #include "ringbuf.h"
9 #include "portability.h"
10 
11 /* This is a hack. Define to 1 to disable */
12 #define RINGBUF_ALIGNED_RETURNS sizeof(uint32_t)
13 
14 /* Length of one write in preempt buffer. Between 64 and 1024 Ko
15    seems the best. */
16 #define PREEMPT_ONE_READ        (1UL<<20)
17 
18 /* This has been tested with small buffer sizes, so as to stress the
19  * pause/resume mechanism (based on pthread conditions). It seems to
20  * work. Note though that this code heavily relies on the fact that there
21  * is only _one_ thread reading data.
22  */
23 
24 #ifndef MIN
25 #define MIN(l,o) ((l) < (o) ? (l) : (o))
26 #endif
27 
28 /* refuse to fill memory with incoming data beyond this size. Incoming
29  * data which can only be stored by expanding the ringbuf capacity beyond
30  * this size are paused */
31 #define RINGBUF_MAX_SIZE        (1 << 26)
32 
33 /* This is only for ringbuf_get2, for the auxiliary malloc()'ed area
34  * which it returns. Note that this is not guided by efficiency needs. */
35 #define RINGBUF_READING_BUFFER_SIZE     (1 << 16)
36 
37 /* must be called with mutex locked !!! */
ringbuf_grow__(ringbuf_ptr r,size_t claim)38 static void ringbuf_grow__(ringbuf_ptr r, size_t claim)
39 {
40     size_t newalloc = r->alloc;
41     if (!claim) {
42         newalloc += newalloc / 2;
43         newalloc |= pagesize(); /* home-made wrapper */
44         newalloc++;
45     } else if (claim <= r->alloc) {
46         return;
47     } else {
48         newalloc = claim;
49         /* round to page size */
50         newalloc--;
51         newalloc |= pagesize();
52         newalloc++;
53     }
54 
55     if (r->avail_to_read) {
56         char * newp = malloc(newalloc);
57         size_t tail = r->alloc - (r->rhead - r->p);
58         ptrdiff_t head = r->avail_to_read - tail;
59         // tail + head == r->avail_to_read
60         if (head > 0) {
61             memcpy(newp, r->rhead, tail);
62             memcpy(newp + tail, r->p, head);
63         } else {
64             memcpy(newp, r->rhead, r->avail_to_read);
65         }
66         free(r->p);
67         r->p = newp;
68         r->rhead = r->p;
69         r->whead = r->p + r->avail_to_read;
70     } else {
71         r->p = realloc(r->p, newalloc);
72         r->rhead = r->p;
73         r->whead = r->p;
74     }
75     r->avail_to_write += newalloc - r->alloc;
76     r->alloc = newalloc;
77 }
78 
ringbuf_init(ringbuf_ptr r,size_t claim)79 void ringbuf_init(ringbuf_ptr r, size_t claim)
80 {
81     memset(r, 0, sizeof(ringbuf));
82     pthread_mutex_init(r->mx, NULL);
83     pthread_cond_init(r->bored, NULL);
84     if (claim) {
85         ringbuf_grow__(r, claim);       /* mutex unneeded within init */
86     }
87 }
88 
ringbuf_clear(ringbuf_ptr r)89 void ringbuf_clear(ringbuf_ptr r)
90 {
91     /*
92     // fprintf(stderr, "ringbuf: %d times full, %d times empty\n",
93             r->full_count, r->empty_count);
94             */
95     pthread_mutex_destroy(r->mx);
96     pthread_cond_destroy(r->bored);
97     free(r->rbuf);
98     free(r->p);
99 }
100 
101 
102 
ringbuf_put(ringbuf_ptr r,char * p,size_t s)103 int ringbuf_put(ringbuf_ptr r, char * p, size_t s)
104 {
105     pthread_mutex_lock(r->mx);
106     // fprintf(stderr, "put(%zu): (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
107     for( ; s > r->avail_to_write ; ) {
108         if (r->alloc >= RINGBUF_MAX_SIZE) {
109             if (s < r->alloc) {
110                 /* Then we want to drain our pipe first. */
111                 // fprintf(stderr, "put(%zu): on hold (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
112                 r->full_count++;
113                 pthread_cond_wait(r->bored, r->mx);
114                 // fprintf(stderr, "put(%zu): resuming (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
115                 continue;
116             } else {
117                 /* Here there is no hope to drain our pipe, so we must
118                  * exceed our desired window size. This is not much of a
119                  * problem though, since the curl reading buffer is not
120                  * expected to grow too large (or so we hope...)
121                  */
122                 // fprintf(stderr, "Warning: buffer growing beyond max size ! (previous=%zu)\n", r->alloc);
123             }
124         }
125 
126         ringbuf_grow__(r, 0);
127     }
128 
129     size_t tail = r->alloc - (r->whead - r->p);
130     if (r->avail_to_write <= tail || s <= tail) {
131         ASSERT(s <= r->avail_to_write);
132         // s = MIN(r->avail_to_write, s);
133         ASSERT(s <= tail);
134         memcpy(r->whead, p, s);
135         r->whead += s;
136     } else {
137         ASSERT(tail > 0);
138         ASSERT(s > tail);
139         memcpy(r->whead, p, tail);
140 #ifndef NDEBUG
141         ptrdiff_t head = r->avail_to_write - tail;
142         ASSERT(head > 0);
143         ASSERT(s-tail <= (size_t) head);
144 #endif
145         // s = tail + MIN((size_t) head, s - tail);
146         memcpy(r->p, p + tail, s - tail);
147         r->whead = r->p + (s-tail);
148     }
149     if ((size_t) (r->whead - r->p) == r->alloc) {
150         r->whead = r->p;
151     }
152     r->avail_to_write -= s;
153     r->avail_to_read += s;
154     /* Could be that someone is waiting for data to be read */
155     pthread_cond_signal(r->bored);
156     pthread_mutex_unlock(r->mx);
157     return s;
158 }
159 
ringbuf_mark_done(ringbuf_ptr r)160 void ringbuf_mark_done(ringbuf_ptr r)
161 {
162     pthread_mutex_lock(r->mx);
163     ASSERT(!r->done);
164     r->done = 1;
165     pthread_cond_broadcast(r->bored);
166     pthread_mutex_unlock(r->mx);
167 }
168 
ringbuf_is_done(ringbuf_ptr r)169 int ringbuf_is_done(ringbuf_ptr r)
170 {
171     pthread_mutex_lock(r->mx);
172     int x = r->done;
173     pthread_mutex_unlock(r->mx);
174     return x;
175 }
176 
ringbuf_get(ringbuf_ptr r,char * p,size_t s)177 int ringbuf_get(ringbuf_ptr r, char * p, size_t s)
178 {
179     pthread_mutex_lock(r->mx);
180     // fprintf(stderr, "get(%zu): (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
181     while (!r->done && r->avail_to_read < RINGBUF_ALIGNED_RETURNS) {
182         // fprintf(stderr, "get(%zu): on hold (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
183         r->empty_count++;
184         pthread_cond_wait(r->bored, r->mx);
185         // fprintf(stderr, "get(%zu): resumed (ravail: %zu, wavail: %zu)\n", s, r->avail_to_read, r->avail_to_write);
186     }
187     ASSERT(r->done || r->avail_to_read >= RINGBUF_ALIGNED_RETURNS);
188     if (r->done && !r->avail_to_read) {
189         pthread_mutex_unlock(r->mx);
190         return 0;
191     }
192     if (r->avail_to_read < RINGBUF_ALIGNED_RETURNS)
193         ASSERT(r->done);
194     size_t tail = r->alloc - (r->rhead - r->p);
195     ASSERT(s >= RINGBUF_ALIGNED_RETURNS);
196     s = MIN(r->avail_to_read, s);
197     if (s >= RINGBUF_ALIGNED_RETURNS) s -= s % RINGBUF_ALIGNED_RETURNS;
198     if (r->avail_to_read <= tail || s <= tail) {
199         ASSERT(s <= tail);
200         memcpy(p, r->rhead, s);
201         r->rhead += s;
202     } else {
203         ASSERT(tail > 0);
204         ASSERT(s > tail);
205         memcpy(p, r->rhead, tail);
206 #ifndef NDEBUG
207         ptrdiff_t head = r->avail_to_read - tail;
208         ASSERT(head > 0);
209         ASSERT((size_t) (s - tail) <= (size_t) head);
210         // s = tail + MIN((size_t) head, s - tail);
211 #endif
212         ASSERT(s >= tail);
213         memcpy(p + tail, r->p, s - tail);
214         r->rhead = r->p + (s-tail);
215     }
216     if ((size_t) (r->rhead - r->p) == r->alloc) {
217         r->rhead = r->p;
218     }
219     r->avail_to_read -= s;
220     r->avail_to_write += s;
221     /* Could be that someone is waiting for room to write data */
222     pthread_cond_signal(r->bored);
223     pthread_mutex_unlock(r->mx);
224     return s;
225 }
226 
ringbuf_get2(ringbuf_ptr r,void ** p,size_t s)227 int ringbuf_get2(ringbuf_ptr r, void ** p, size_t s)
228 {
229     if (*p) {
230         return ringbuf_get(r, *p, s);
231     }
232     ASSERT(s == 0);     // does not really make sense otherwise
233     pthread_mutex_lock(r->mx);
234     if (!r->rbuf) {
235         r->rbuf = malloc(RINGBUF_READING_BUFFER_SIZE);
236     }
237     pthread_mutex_unlock(r->mx);
238     /*
239        if (s > RINGBUF_READING_BUFFER_SIZE || s == 0)
240        s = RINGBUF_READING_BUFFER_SIZE;
241        */
242     *p = r->rbuf;
243     return ringbuf_get(r, r->rbuf, RINGBUF_READING_BUFFER_SIZE);
244 }
245 
ringbuf_feed_stream(ringbuf_ptr r,FILE * f)246 ssize_t ringbuf_feed_stream(ringbuf_ptr r, FILE * f)
247 {
248     ssize_t nread = 0;
249 
250     /* We are the only thread decreasing the avail_to_write counter in
251      * rb. So we may keep a copy of its value, which will always be a
252      * lower bound, provided that we accurately report our decreases both
253      * to our local value and  to the global counter.  */
254     pthread_mutex_lock(r->mx);
255     size_t local_w_avail = r->avail_to_write;
256     pthread_mutex_unlock(r->mx);
257 
258     for( ; ; ) {
259         /* Make sure our writing space in the buffer is not empty */
260         if (local_w_avail == 0) {
261             pthread_mutex_lock(r->mx);
262             for( ; ! r->avail_to_write ; ) {
263                 r->full_count++;
264                 pthread_cond_wait(r->bored, r->mx);
265             }
266             local_w_avail = r->avail_to_write;
267             pthread_mutex_unlock(r->mx);
268         }
269         /* We may now fread() from f, but only up to the _contiguous_
270          * amount which is available in the buffer. This entails some
271          * intimate dialogue with the ringbuf internals, which
272          * obviously isn't cool (well, in fact, this whole thing
273          * could probably be considered within the ringbuf API, after
274          * all ?) */
275         /* We are the only thread likely to call ringbuf_grow__,
276          * which is the only (internal) call tinkering with r->p (and
277          * hence the validity of r->whead */
278         size_t tail = r->alloc - (r->whead - r->p);
279 
280         tail = MIN(tail, local_w_avail);
281 
282         /* restrict to reads of some maximum size, or we'll be too
283          * long delivering data to our customers */
284         tail = MIN(tail, PREEMPT_ONE_READ);
285 
286         size_t s = fread(r->whead, 1, tail, f);
287         nread += s;
288 
289         if (s) {
290             pthread_mutex_lock(r->mx);
291             r->whead += s;
292             r->avail_to_read += s;
293             local_w_avail = r->avail_to_write -= s;
294             if ((size_t) (r->whead - r->p) == r->alloc) {
295                 r->whead = r->p;
296             }
297             /* Could be that someone is waiting for data to be read */
298             pthread_cond_signal(r->bored);
299             pthread_mutex_unlock(r->mx);
300         } else if (feof(f)) {
301             return nread;
302         } else {
303             return -1;
304         }
305     }
306 }
307 
308 /* Search for character c, from position (offset) bytes into the readable
309  * part of the input buffer.  Return the offset in bytes from the initial
310  * segment to the matching byte (thus an integer >= offset), or -1 if the
311  * byte could not be found.
312  *
313  * This might be called by the reader thread with the mutex unlocked,
314  * under the condition that the reading thread is unique.
315  */
ringbuf_strchr(ringbuf_ptr r,int c,size_t offset)316 int ringbuf_strchr(ringbuf_ptr r, int c, size_t offset)
317 {
318     pthread_mutex_lock(r->mx);
319     ASSERT_ALWAYS(offset <= r->avail_to_read);
320     pthread_mutex_unlock(r->mx);
321     size_t tail = r->alloc - (r->rhead - r->p);
322     int s = offset;
323     for(; (size_t) s < tail ; s++) {
324         if (r->rhead[s] == c)
325             return s;
326     }
327     tail = r->avail_to_read - s;
328     for(int t = 0 ; (size_t) t < tail ; s++,t++) {
329         if (r->p[t] == c)
330             return s;
331     }
332     return -1;
333 }
334 
ringbuf_skip_get(ringbuf_ptr r,size_t s)335 int ringbuf_skip_get(ringbuf_ptr r, size_t s)
336 {
337     pthread_mutex_lock(r->mx);
338     size_t d = (r->rhead-r->p) + s;
339     if (d >= r->alloc) {
340         d -= r->alloc;
341     }
342     r->rhead = r->p + d;
343     r->avail_to_read -= s;
344     r->avail_to_write += s;
345     /* Could be that someone is waiting for room to write data */
346     pthread_cond_signal(r->bored);
347     pthread_mutex_unlock(r->mx);
348     return 0;
349 }
350