1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdlib.h>
19 #include <stddef.h>
20 #include <string.h>
21 #include <libcouchbase/assert.h>
22 #include <stdio.h>
23 #include "rope.h"
24 
25 #define MINIMUM(a, b) (a) < (b) ? a : b
26 
27 #define SEG_RELEASE(seg) (seg)->allocator->s_release((seg)->allocator, seg)
28 #define SEG_REALLOC(seg, n) (seg)->allocator->s_realloc((seg)->allocator, seg, n)
29 #define ROPE_SALLOC(rope, n) (rope)->allocator->s_alloc((rope)->allocator, n)
30 static void wipe_rope(rdb_ROPEBUF *rope);
31 
32 unsigned
rdb_rdstart(rdb_IOROPE * ior,nb_IOV * iov,unsigned niov)33 rdb_rdstart(rdb_IOROPE *ior, nb_IOV *iov, unsigned niov)
34 {
35     unsigned orig_niov = niov;
36     unsigned cur_rdsize = 0;
37 
38     lcb_list_t *ll;
39     rdb_ROPESEG *seg = RDB_SEG_LAST(&ior->recvd);
40 
41     if (seg && RDB_SEG_SPACE(seg)) {
42         iov->iov_base = RDB_SEG_WBUF(seg);
43         iov->iov_len = RDB_SEG_SPACE(seg);
44         cur_rdsize += iov->iov_len;
45         ++iov;
46         --niov;
47         if (cur_rdsize >= ior->rdsize) {
48             return 1;
49         }
50     }
51 
52     if (!niov) {
53         return orig_niov - niov;
54     }
55 
56     ior->avail.allocator->r_reserve(
57             ior->avail.allocator, &ior->avail, ior->rdsize - cur_rdsize);
58 
59     lcb_assert(!LCB_LIST_IS_EMPTY(&ior->avail.segments));
60 
61     LCB_LIST_FOR(ll, &ior->avail.segments) {
62         rdb_ROPESEG *cur = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
63         iov->iov_base = RDB_SEG_WBUF(cur);
64         iov->iov_len = RDB_SEG_SPACE(cur);
65         ++iov;
66 
67         if (!--niov) {
68             break;
69         }
70     }
71     return orig_niov - niov;
72 }
73 
74 
75 void
rdb_rdend(rdb_IOROPE * ior,unsigned nr)76 rdb_rdend(rdb_IOROPE *ior, unsigned nr)
77 {
78     unsigned to_chop;
79     lcb_list_t *llcur, *llnext;
80 
81     /** Chop the first segment at the end, if there's space */
82     rdb_ROPESEG *seg = RDB_SEG_LAST(&ior->recvd);
83     if (seg && RDB_SEG_SPACE(seg)) {
84         to_chop = MINIMUM(nr, RDB_SEG_SPACE(seg));
85         seg->nused += to_chop;
86         ior->recvd.nused += to_chop;
87 
88         if (! (nr -= to_chop)) {
89             wipe_rope(&ior->avail);
90             return;
91         }
92     }
93 
94     LCB_LIST_SAFE_FOR(llcur, llnext, &ior->avail.segments) {
95         seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
96         to_chop = MINIMUM(nr, RDB_SEG_SPACE(seg));
97 
98         seg->nused += to_chop;
99         ior->recvd.nused += seg->nused;
100 
101         lcb_list_delete(&seg->llnode);
102         lcb_list_append(&ior->recvd.segments, &seg->llnode);
103         if (! (nr -= to_chop)) {
104             wipe_rope(&ior->avail);
105             return;
106         }
107     }
108 
109     /** Reads didn't fit into any segment */
110     fprintf(stderr, "RDB: Tried to consume more than available in the buffer (n=%u)\n", nr);
111     lcb_assert(0);
112 }
113 
114 static void
seg_consumed(rdb_ROPEBUF * rope,rdb_ROPESEG * seg,unsigned nr)115 seg_consumed(rdb_ROPEBUF *rope, rdb_ROPESEG *seg, unsigned nr)
116 {
117     lcb_assert(nr <= seg->nused);
118     seg->nused -= nr;
119     seg->start += nr;
120     rope->nused -= nr;
121 
122     if (!seg->nused) {
123         lcb_list_delete(&seg->llnode);
124         seg->shflags &= ~RDB_ROPESEG_F_LIB;
125         if (rdb_seg_recyclable(seg)) {
126             SEG_RELEASE(seg);
127         }
128     }
129 }
130 
131 static void
rope_consumed(rdb_ROPEBUF * rope,unsigned nr)132 rope_consumed(rdb_ROPEBUF *rope, unsigned nr)
133 {
134     lcb_list_t *llcur, *llnext;
135     lcb_assert(nr <= rope->nused);
136 
137     LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
138         unsigned to_chop;
139         rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
140         to_chop = MINIMUM(nr, seg->nused);
141         seg_consumed(rope, seg, to_chop);
142 
143         if (! (nr -= to_chop)) {
144             break;
145         }
146     }
147 }
148 
149 void
rdb_consumed(rdb_IOROPE * ior,unsigned nr)150 rdb_consumed(rdb_IOROPE *ior, unsigned nr)
151 {
152     rope_consumed(&ior->recvd, nr);
153 }
154 
155 static void
try_compact(rdb_ROPESEG * seg)156 try_compact(rdb_ROPESEG *seg)
157 {
158     /** Can't move stuff around.. */
159     char *cp_end;
160     if (!rdb_seg_recyclable(seg)) {
161         return;
162     }
163 
164     /**
165      * Copy only if:
166      * (1) Waste in the beginning is >= nalloc/2
167      * (3) There is no overlap between the two (i.e. memcpy)
168      */
169     if (seg->start < seg->nalloc / 2) {
170         return;
171     }
172 
173     cp_end = seg->root + seg->nused;
174     if (seg->root + seg->start < cp_end) {
175         /** Overlap */
176         return;
177     }
178 
179     memcpy(seg->root, seg->root + seg->start, seg->nused);
180     seg->start = 0;
181 }
182 
183 static void
rope_consolidate(rdb_ROPEBUF * rope,unsigned nr)184 rope_consolidate(rdb_ROPEBUF *rope, unsigned nr)
185 {
186     rdb_ROPESEG *seg, *newseg;
187     lcb_list_t *llcur, *llnext;
188 
189     seg = RDB_SEG_FIRST(rope);
190     if (seg->nused + RDB_SEG_SPACE(seg) >= nr || nr < 2) {
191         return;
192     }
193 
194     try_compact(seg);
195     lcb_list_delete(&seg->llnode);
196 
197     if (rdb_seg_recyclable(seg)) {
198         unsigned to_alloc = nr + seg->start;
199         newseg = SEG_REALLOC(seg, to_alloc);
200         /* We re-add it back after traversal */
201     } else {
202         newseg = ROPE_SALLOC(rope, nr);
203         memcpy(RDB_SEG_WBUF(newseg), RDB_SEG_RBUF(seg), seg->nused);
204         newseg->nused = seg->nused;
205         /* "Free" it. Since this buffer is in use, we just unset our own flag */
206         seg->shflags &= ~RDB_ROPESEG_F_LIB;
207     }
208 
209     rope->nused -= newseg->nused;
210     nr -= newseg->nused;
211 
212     LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
213         unsigned to_copy;
214         seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
215         to_copy = MINIMUM(nr, seg->nused);
216 
217         memcpy(RDB_SEG_WBUF(newseg), RDB_SEG_RBUF(seg), to_copy);
218         newseg->nused += to_copy;
219 
220         seg_consumed(rope, seg, to_copy);
221         if (! (nr -= to_copy)) {
222             break;
223         }
224     }
225 
226     lcb_list_prepend(&rope->segments, &newseg->llnode);
227     rope->nused += newseg->nused;
228     lcb_assert(rope->nused >= nr);
229 }
230 
231 void
rdb_consolidate(rdb_IOROPE * ior,unsigned nr)232 rdb_consolidate(rdb_IOROPE *ior, unsigned nr)
233 {
234     rope_consolidate(&ior->recvd, nr);
235 }
236 
237 void
rdb_copyread(rdb_IOROPE * ior,void * tgt,unsigned n)238 rdb_copyread(rdb_IOROPE *ior, void *tgt, unsigned n)
239 {
240     lcb_list_t *ll;
241     char *p = tgt;
242 
243     LCB_LIST_FOR(ll, &ior->recvd.segments) {
244         rdb_ROPESEG *seg = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
245         unsigned to_copy = MINIMUM(seg->nused, n);
246         memcpy(p, RDB_SEG_RBUF(seg), to_copy);
247         p += to_copy;
248         n -= to_copy;
249         if (!n) {
250             break;
251         }
252     }
253 }
254 
255 int
rdb_refread_ex(rdb_IOROPE * ior,nb_IOV * iov,rdb_ROPESEG ** segs,unsigned nelem,unsigned ndata)256 rdb_refread_ex(rdb_IOROPE *ior, nb_IOV *iov, rdb_ROPESEG **segs,
257                unsigned nelem, unsigned ndata)
258 {
259     unsigned orig_nelem = nelem;
260     lcb_list_t *ll;
261     LCB_LIST_FOR(ll, &ior->recvd.segments) {
262         rdb_ROPESEG *seg = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
263         unsigned cur_len = MINIMUM(ndata, seg->nused);
264         iov->iov_len = cur_len;
265         iov->iov_base = RDB_SEG_RBUF(seg);
266         *segs = seg;
267 
268         ++iov;
269         ++segs;
270         --nelem;
271 
272         ndata -= cur_len;
273 
274         if (!ndata) {
275             return orig_nelem - nelem;
276         }
277 
278         if (!nelem) {
279             return -1;
280         }
281     }
282 
283     /** Requested more data than we have */
284     fprintf(stderr, "RDB: refread_ex was passed a size greater than our buffer (n=%u)\n", ndata);
285     return -1;
286 }
287 
288 unsigned
rdb_get_contigsize(rdb_IOROPE * ior)289 rdb_get_contigsize(rdb_IOROPE *ior)
290 {
291     rdb_ROPESEG *seg = RDB_SEG_FIRST(&ior->recvd);
292     if (!seg) {
293         return 0;
294     }
295     return seg->nused;
296 }
297 
298 char *
rdb_get_consolidated(rdb_IOROPE * ior,unsigned n)299 rdb_get_consolidated(rdb_IOROPE *ior, unsigned n)
300 {
301     lcb_assert(ior->recvd.nused >= n);
302     rdb_consolidate(ior, n);
303     return RDB_SEG_RBUF(RDB_SEG_FIRST(&ior->recvd));
304 }
305 
306 void
rdb_seg_ref(rdb_ROPESEG * seg)307 rdb_seg_ref(rdb_ROPESEG *seg)
308 {
309     seg->refcnt++;
310     seg->shflags |= RDB_ROPESEG_F_USER;
311 }
312 
313 void
rdb_seg_unref(rdb_ROPESEG * seg)314 rdb_seg_unref(rdb_ROPESEG *seg)
315 {
316     if (--seg->refcnt) {
317         return;
318     }
319     seg->shflags &= ~RDB_ROPESEG_F_USER;
320     if (seg->shflags & RDB_ROPESEG_F_LIB) {
321         return;
322     }
323     SEG_RELEASE(seg);
324 }
325 
326 void
rdb_init(rdb_IOROPE * ior,rdb_ALLOCATOR * alloc)327 rdb_init(rdb_IOROPE *ior, rdb_ALLOCATOR *alloc)
328 {
329     memset(ior, 0, sizeof(*ior));
330     lcb_list_init(&ior->recvd.segments);
331     lcb_list_init(&ior->avail.segments);
332     rdb_challoc(ior, alloc);
333     ior->rdsize = 32768;
334 }
335 
336 static void
wipe_rope(rdb_ROPEBUF * rope)337 wipe_rope(rdb_ROPEBUF *rope)
338 {
339     lcb_list_t *llcur, *llnext;
340     LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
341         rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
342         seg_consumed(rope, seg, seg->nused);
343     }
344 }
345 
346 void
rdb_cleanup(rdb_IOROPE * ior)347 rdb_cleanup(rdb_IOROPE *ior)
348 {
349     wipe_rope(&ior->recvd);
350     wipe_rope(&ior->avail);
351     ior->recvd.allocator->a_release(ior->recvd.allocator);
352 }
353 
354 void
rdb_challoc(rdb_IOROPE * ior,rdb_ALLOCATOR * alloc)355 rdb_challoc(rdb_IOROPE *ior, rdb_ALLOCATOR *alloc)
356 {
357     if (ior->recvd.allocator) {
358         ior->recvd.allocator->a_release(ior->recvd.allocator);
359     }
360 
361     ior->recvd.allocator = alloc;
362     ior->avail.allocator = alloc;
363 }
364 
365 void
rdb_copywrite(rdb_IOROPE * ior,void * buf,unsigned nbuf)366 rdb_copywrite(rdb_IOROPE *ior, void *buf, unsigned nbuf)
367 {
368     char *cur = buf;
369 
370     while (nbuf) {
371         unsigned ii;
372         unsigned orig_nbuf = nbuf;
373         nb_IOV iov[32];
374         unsigned niov;
375 
376         niov = rdb_rdstart(ior, iov, 32);
377         for (ii = 0; ii < niov && nbuf; ii++) {
378             unsigned to_copy = MINIMUM(nbuf, iov[ii].iov_len);
379             memcpy(iov[ii].iov_base, cur, to_copy);
380             cur += to_copy;
381             nbuf -= to_copy;
382         }
383         rdb_rdend(ior, orig_nbuf - nbuf);
384     }
385 }
386 
387 static void
dump_ropebuf(const rdb_ROPEBUF * buf,FILE * fp)388 dump_ropebuf(const rdb_ROPEBUF *buf, FILE *fp)
389 {
390     lcb_list_t *llcur;
391     fprintf(fp, "TOTAL LENGTH: %u\n", buf->nused);
392     fprintf(fp, "WILL DUMP SEGMENTS..\n");
393     LCB_LIST_FOR(llcur, &buf->segments) {
394         const char *indent = "    ";
395         rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
396         fprintf(fp, "%sSEG=%p\n", indent, (void*)seg);
397         fprintf(fp, "%sALLOCATOR=%p [%u]\n", indent, (void*)seg->allocator, seg->allocid);
398         fprintf(fp, "%sBUFROOT=%p\n", indent, (void *)seg->root);
399         fprintf(fp, "%sALLOC SIZE: %u\n", indent, seg->nalloc);
400         fprintf(fp, "%sDATA SIZE: %u\n", indent, seg->nused);
401         fprintf(fp, "%sDATS OFFSET: %u\n", indent, seg->start);
402         fprintf(fp, "%sSEG FLAGS: 0x%x\n", indent, seg->shflags);
403         fprintf(fp, "%sSEG REFCNT: %u\n", indent, seg->refcnt);
404         fprintf(fp, "\n");
405     }
406 }
407 
408 void
rdb_dump(const rdb_IOROPE * ior,FILE * fp)409 rdb_dump(const rdb_IOROPE *ior, FILE *fp)
410 {
411     fprintf(fp, "@@ DUMP IOROPE=%p\n", (void*)ior);
412     fprintf(fp, "@@ ROPEBUF[AVAIL]=%p\n", (void*)&ior->avail);
413     dump_ropebuf(&ior->avail, fp);
414     fprintf(fp, "@@ ROPEBUF[ACTIVE]=%p\n", (void*)&ior->recvd);
415     dump_ropebuf(&ior->recvd, fp);
416     if (ior->avail.allocator && ior->avail.allocator->dump) {
417         ior->avail.allocator->dump(ior->avail.allocator, fp);
418     }
419 }
420