1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2017 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #include "rd.h"
31 #include "rdbuf.h"
32 #include "rdunittest.h"
33 #include "rdlog.h"
34 #include "rdcrc32.h"
35 #include "crc32c.h"
36 
37 
38 static size_t
39 rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p);
40 
41 
42 /**
43  * @brief Destroy the segment and free its payload.
44  *
45  * @remark Will NOT unlink from buffer.
46  */
47 static void rd_segment_destroy (rd_segment_t *seg) {
48         /* Free payload */
49         if (seg->seg_free && seg->seg_p)
50                 seg->seg_free(seg->seg_p);
51 
52         if (seg->seg_flags & RD_SEGMENT_F_FREE)
53                 rd_free(seg);
54 }
55 
56 /**
57  * @brief Initialize segment with absolute offset, backing memory pointer,
58  *        and backing memory size.
59  * @remark The segment is NOT linked.
60  */
61 static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) {
62         memset(seg, 0, sizeof(*seg));
63         seg->seg_p     = mem;
64         seg->seg_size  = size;
65 }
66 
67 
68 /**
69  * @brief Append segment to buffer
70  *
71  * @remark Will set the buffer position to the new \p seg if no existing wpos.
72  * @remark Will set the segment seg_absof to the current length of the buffer.
73  */
74 static rd_segment_t *rd_buf_append_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
75         TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link);
76         rbuf->rbuf_segment_cnt++;
77         seg->seg_absof      = rbuf->rbuf_len;
78         rbuf->rbuf_len     += seg->seg_of;
79         rbuf->rbuf_size    += seg->seg_size;
80 
81         /* Update writable position */
82         if (!rbuf->rbuf_wpos)
83                 rbuf->rbuf_wpos = seg;
84         else
85                 rd_buf_get_writable0(rbuf, NULL, NULL);
86 
87         return seg;
88 }
89 
90 
91 
92 
93 /**
94  * @brief Attempt to allocate \p size bytes from the buffers extra buffers.
95  * @returns the allocated pointer which MUST NOT be freed, or NULL if
96  *          not enough memory.
97  * @remark the returned pointer is memory-aligned to be safe.
98  */
99 static void *extra_alloc (rd_buf_t *rbuf, size_t size) {
100         size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */
101         void *p;
102 
103         if (of + size > rbuf->rbuf_extra_size)
104                 return NULL;
105 
106         p = rbuf->rbuf_extra + of; /* Aligned pointer */
107 
108         rbuf->rbuf_extra_len = of + size;
109 
110         return p;
111 }
112 
113 
114 
115 /**
116  * @brief Get a pre-allocated segment if available, or allocate a new
117  *        segment with the extra amount of \p size bytes allocated for payload.
118  *
119  *        Will not append the segment to the buffer.
120  */
121 static rd_segment_t *
122 rd_buf_alloc_segment0 (rd_buf_t *rbuf, size_t size) {
123         rd_segment_t *seg;
124 
125         /* See if there is enough room in the extra buffer for
126          * allocating the segment header and the buffer,
127          * or just the segment header, else fall back to malloc. */
128         if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) {
129                 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
130 
131         } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) {
132                 rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size);
133                 if (size > 0)
134                         seg->seg_free = rd_free;
135 
136         } else if ((seg = rd_malloc(sizeof(*seg) + size))) {
137                 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
138                 seg->seg_flags |= RD_SEGMENT_F_FREE;
139 
140         } else
141                 rd_assert(!*"segment allocation failure");
142 
143         return seg;
144 }
145 
146 /**
147  * @brief Allocate between \p min_size .. \p max_size of backing memory
148  *        and add it as a new segment to the buffer.
149  *
150  *        The buffer position is updated to point to the new segment.
151  *
152  *        The segment will be over-allocated if permitted by max_size
153  *        (max_size == 0 or max_size > min_size).
154  */
155 static rd_segment_t *
156 rd_buf_alloc_segment (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
157         rd_segment_t *seg;
158 
159         /* Over-allocate if allowed. */
160         if (min_size != max_size || max_size == 0)
161                 max_size = RD_MAX(sizeof(*seg) * 4,
162                                   RD_MAX(min_size * 2,
163                                          rbuf->rbuf_size / 2));
164 
165         seg = rd_buf_alloc_segment0(rbuf, max_size);
166 
167         rd_buf_append_segment(rbuf, seg);
168 
169         return seg;
170 }
171 
172 
173 /**
174  * @brief Ensures that \p size bytes will be available
175  *        for writing and the position will be updated to point to the
176  *        start of this contiguous block.
177  */
178 void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) {
179         rd_segment_t *seg = rbuf->rbuf_wpos;
180 
181         if (seg) {
182                 void *p;
183                 size_t remains = rd_segment_write_remains(seg, &p);
184 
185                 if (remains >= size)
186                         return; /* Existing segment has enough space. */
187 
188                 /* Future optimization:
189                  * If existing segment has enough remaining space to warrant
190                  * a split, do it, before allocating a new one. */
191         }
192 
193         /* Allocate new segment */
194         rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);
195 }
196 
197 /**
198  * @brief Ensures that at least \p size bytes will be available for
199  *        a future write.
200  *
201  *        Typically used prior to a call to rd_buf_get_write_iov()
202  */
203 void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
204         size_t remains;
205         while ((remains = rd_buf_write_remains(rbuf)) < min_size)
206                 rd_buf_alloc_segment(rbuf,
207                                      min_size - remains,
208                                      max_size ? max_size - remains : 0);
209 }
210 
211 
212 /**
213  * @returns the segment at absolute offset \p absof, or NULL if out of range.
214  *
215  * @remark \p hint is an optional segment where to start looking, such as
216  *         the current write or read position.
217  */
218 rd_segment_t *
219 rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint,
220                               size_t absof) {
221         const rd_segment_t *seg = hint;
222 
223         if (unlikely(absof >= rbuf->rbuf_len))
224                 return NULL;
225 
226         /* Only use current write position if possible and if it helps */
227         if (!seg || absof < seg->seg_absof)
228                 seg = TAILQ_FIRST(&rbuf->rbuf_segments);
229 
230         do {
231                 if (absof >= seg->seg_absof &&
232                     absof < seg->seg_absof + seg->seg_of) {
233                         rd_dassert(seg->seg_absof <= rd_buf_len(rbuf));
234                         return (rd_segment_t *)seg;
235                 }
236         } while ((seg = TAILQ_NEXT(seg, seg_link)));
237 
238         return NULL;
239 }
240 
241 
242 /**
243  * @brief Split segment \p seg at absolute offset \p absof, appending
244  *        a new segment after \p seg with its memory pointing to the
245  *        memory starting at \p absof.
246  *        \p seg 's memory will be shorted to the \p absof.
247  *
248  *        The new segment is NOT appended to the buffer.
249  *
250  * @warning MUST ONLY be used on the LAST segment
251  *
252  * @warning if a segment is inserted between these two splitted parts
253  *          it is imperative that the later segment's absof is corrected.
254  *
255  * @remark The seg_free callback is retained on the original \p seg
256  *         and is not copied to the new segment, but flags are copied.
257  */
258 static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg,
259                                        size_t absof) {
260         rd_segment_t *newseg;
261         size_t relof;
262 
263         rd_assert(seg == rbuf->rbuf_wpos);
264         rd_assert(absof >= seg->seg_absof &&
265                   absof <= seg->seg_absof + seg->seg_of);
266 
267         relof = absof - seg->seg_absof;
268 
269         newseg = rd_buf_alloc_segment0(rbuf, 0);
270 
271         /* Add later part of split bytes to new segment */
272         newseg->seg_p      = seg->seg_p+relof;
273         newseg->seg_of     = seg->seg_of-relof;
274         newseg->seg_size   = seg->seg_size-relof;
275         newseg->seg_absof  = SIZE_MAX; /* Invalid */
276         newseg->seg_flags |= seg->seg_flags;
277 
278         /* Remove earlier part of split bytes from previous segment */
279         seg->seg_of        = relof;
280         seg->seg_size      = relof;
281 
282         /* newseg's length will be added to rbuf_len in append_segment(),
283          * so shave it off here from seg's perspective. */
284         rbuf->rbuf_len   -= newseg->seg_of;
285         rbuf->rbuf_size  -= newseg->seg_size;
286 
287         return newseg;
288 }
289 
290 
291 
292 
293 /**
294  * @brief Unlink and destroy a segment, updating the \p rbuf
295  *        with the decrease in length and capacity.
296  */
297 static void rd_buf_destroy_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
298         rd_assert(rbuf->rbuf_segment_cnt > 0 &&
299                   rbuf->rbuf_len >= seg->seg_of &&
300                   rbuf->rbuf_size >= seg->seg_size);
301 
302         TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link);
303         rbuf->rbuf_segment_cnt--;
304         rbuf->rbuf_len  -= seg->seg_of;
305         rbuf->rbuf_size -= seg->seg_size;
306         if (rbuf->rbuf_wpos == seg)
307                 rbuf->rbuf_wpos = NULL;
308 
309         rd_segment_destroy(seg);
310 }
311 
312 
313 /**
314  * @brief Free memory associated with the \p rbuf, but not the rbuf itself.
315  *        Segments will be destroyed.
316  */
317 void rd_buf_destroy (rd_buf_t *rbuf) {
318         rd_segment_t *seg, *tmp;
319 
320 #if ENABLE_DEVEL
321         /* FIXME */
322         if (rbuf->rbuf_len > 0 && 0) {
323                 size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len;
324                 float fill_grade = (float)rbuf->rbuf_len /
325                         (float)rbuf->rbuf_size;
326 
327                 printf("fill grade: %.2f%% (%"PRIusz" bytes over-allocated)\n",
328                        fill_grade * 100.0f, overalloc);
329         }
330 #endif
331 
332 
333         TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) {
334                 rd_segment_destroy(seg);
335 
336         }
337 
338         if (rbuf->rbuf_extra)
339                 rd_free(rbuf->rbuf_extra);
340 }
341 
342 
343 /**
344  * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments
345  *        where the first segment will have a \p buf_size of backing memory.
346  *
347  *        The caller may rearrange the backing memory as it see fits.
348  */
349 void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) {
350         size_t totalloc = 0;
351 
352         memset(rbuf, 0, sizeof(*rbuf));
353         TAILQ_INIT(&rbuf->rbuf_segments);
354 
355         if (!fixed_seg_cnt) {
356                 assert(!buf_size);
357                 return;
358         }
359 
360         /* Pre-allocate memory for a fixed set of segments that are known
361          * before-hand, to minimize the number of extra allocations
362          * needed for well-known layouts (such as headers, etc) */
363         totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt;
364 
365         /* Pre-allocate extra space for the backing buffer. */
366         totalloc += buf_size;
367 
368         rbuf->rbuf_extra_size = totalloc;
369         rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);
370 }
371 
372 
373 
374 
375 /**
376  * @brief Convenience writer iterator interface.
377  *
378  *        After writing to \p p the caller must update the written length
379  *        by calling rd_buf_write(rbuf, NULL, written_length)
380  *
381  * @returns the number of contiguous writable bytes in segment
382  *          and sets \p *p to point to the start of the memory region.
383  */
384 static size_t
385 rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) {
386         rd_segment_t *seg;
387 
388         for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
389                 size_t len = rd_segment_write_remains(seg, p);
390 
391                 /* Even though the write offset hasn't changed we
392                  * avoid future segment scans by adjusting the
393                  * wpos here to the first writable segment. */
394                 rbuf->rbuf_wpos = seg;
395                 if (segp)
396                         *segp = seg;
397 
398                 if (unlikely(len == 0))
399                         continue;
400 
401                 /* Also adjust absof if the segment was allocated
402                  * before the previous segment's memory was exhausted
403                  * and thus now might have a lower absolute offset
404                  * than the previos segment's now higher relative offset. */
405                 if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len)
406                         seg->seg_absof = rbuf->rbuf_len;
407 
408                 return len;
409         }
410 
411         return 0;
412 }
413 
414 size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p) {
415         rd_segment_t *seg;
416         return rd_buf_get_writable0(rbuf, &seg, p);
417 }
418 
419 
420 
421 
422 /**
423  * @brief Write \p payload of \p size bytes to current position
424  *        in buffer. A new segment will be allocated and appended
425  *        if needed.
426  *
427  * @returns the write position where payload was written (pre-write).
428  *          Returning the pre-positition allows write_update() to later
429  *          update the same location, effectively making write()s
430  *          also a place-holder mechanism.
431  *
432  * @remark If \p payload is NULL only the write position is updated,
433  *         in this mode it is required for the buffer to have enough
434  *         memory for the NULL write (as it would otherwise cause
435  *         uninitialized memory in any new segments allocated from this
436  *         function).
437  */
438 size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) {
439         size_t remains = size;
440         size_t initial_absof;
441         const char *psrc = (const char *)payload;
442 
443         initial_absof = rbuf->rbuf_len;
444 
445         /* Ensure enough space by pre-allocating segments. */
446         rd_buf_write_ensure(rbuf, size, 0);
447 
448         while (remains > 0) {
449                 void *p = NULL;
450                 rd_segment_t *seg = NULL;
451                 size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p);
452                 size_t wlen = RD_MIN(remains, segremains);
453 
454                 rd_dassert(seg == rbuf->rbuf_wpos);
455                 rd_dassert(wlen > 0);
456                 rd_dassert(seg->seg_p+seg->seg_of <= (char *)p &&
457                            (char *)p < seg->seg_p+seg->seg_size);
458 
459                 if (payload) {
460                         memcpy(p, psrc, wlen);
461                         psrc += wlen;
462                 }
463 
464                 seg->seg_of    += wlen;
465                 rbuf->rbuf_len += wlen;
466                 remains        -= wlen;
467         }
468 
469         rd_assert(remains == 0);
470 
471         return initial_absof;
472 }
473 
474 
475 
476 /**
477  * @brief Write \p slice to \p rbuf
478  *
479  * @remark The slice position will be updated.
480  *
481  * @returns the number of bytes witten (always slice length)
482  */
483 size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice) {
484         const void *p;
485         size_t rlen;
486         size_t sum = 0;
487 
488         while ((rlen = rd_slice_reader(slice, &p))) {
489                 size_t r;
490                 r = rd_buf_write(rbuf, p, rlen);
491                 rd_dassert(r != 0);
492                 sum += r;
493         }
494 
495         return sum;
496 }
497 
498 
499 
500 /**
501  * @brief Write \p payload of \p size at absolute offset \p absof
502  *        WITHOUT updating the total buffer length.
503  *
504  *        This is used to update a previously written region, such
505  *        as updating the header length.
506  *
507  * @returns the number of bytes written, which may be less than \p size
508  *          if the update spans multiple segments.
509  */
510 static size_t rd_segment_write_update (rd_segment_t *seg, size_t absof,
511                                        const void *payload, size_t size) {
512         size_t relof;
513         size_t wlen;
514 
515         rd_dassert(absof >= seg->seg_absof);
516         relof = absof - seg->seg_absof;
517         rd_assert(relof <= seg->seg_of);
518         wlen = RD_MIN(size, seg->seg_of - relof);
519         rd_dassert(relof + wlen <= seg->seg_of);
520 
521         memcpy(seg->seg_p+relof, payload, wlen);
522 
523         return wlen;
524 }
525 
526 
527 
528 /**
529  * @brief Write \p payload of \p size at absolute offset \p absof
530  *        WITHOUT updating the total buffer length.
531  *
532  *        This is used to update a previously written region, such
533  *        as updating the header length.
534  */
535 size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
536                             const void *payload, size_t size) {
537         rd_segment_t *seg;
538         const char *psrc = (const char *)payload;
539         size_t of;
540 
541         /* Find segment for offset */
542         seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
543         rd_assert(seg && *"invalid absolute offset");
544 
545         for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) {
546                 rd_assert(seg->seg_absof <= rd_buf_len(rbuf));
547                 size_t wlen = rd_segment_write_update(seg, absof+of,
548                                                       psrc+of, size-of);
549                 of += wlen;
550         }
551 
552         rd_dassert(of == size);
553 
554         return of;
555 }
556 
557 
558 
559 /**
560  * @brief Push reference memory segment to current write position.
561  */
562 void rd_buf_push0 (rd_buf_t *rbuf, const void *payload, size_t size,
563                    void (*free_cb)(void *), rd_bool_t writable) {
564         rd_segment_t *prevseg, *seg, *tailseg = NULL;
565 
566         if ((prevseg = rbuf->rbuf_wpos) &&
567             rd_segment_write_remains(prevseg, NULL) > 0) {
568                 /* If the current segment still has room in it split it
569                  * and insert the pushed segment in the middle (below). */
570                 tailseg = rd_segment_split(rbuf, prevseg,
571                                            prevseg->seg_absof +
572                                            prevseg->seg_of);
573         }
574 
575         seg = rd_buf_alloc_segment0(rbuf, 0);
576         seg->seg_p      = (char *)payload;
577         seg->seg_size   = size;
578         seg->seg_of     = size;
579         seg->seg_free   = free_cb;
580         if (!writable)
581                 seg->seg_flags |= RD_SEGMENT_F_RDONLY;
582 
583         rd_buf_append_segment(rbuf, seg);
584 
585         if (tailseg)
586                 rd_buf_append_segment(rbuf, tailseg);
587 }
588 
589 
590 
591 /**
592  * @brief Erase \p size bytes at \p absof from buffer.
593  *
594  * @returns the number of bytes erased.
595  *
596  * @remark This is costly since it forces a memory move.
597  */
598 size_t rd_buf_erase (rd_buf_t *rbuf, size_t absof, size_t size) {
599         rd_segment_t *seg, *next = NULL;
600         size_t of;
601 
602         /* Find segment for offset */
603         seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof);
604 
605         /* Adjust segments until size is exhausted, then continue scanning to
606          * update the absolute offset. */
607         for (of = 0 ; seg && of < size ; seg = next) {
608                 /* Example:
609                  *   seg_absof = 10
610                  *   seg_of    = 7
611                  *   absof     = 12
612                  *   of        = 1
613                  *   size      = 4
614                  *
615                  * rof          = 3   relative segment offset where to erase
616                  * eraseremains = 3   remaining bytes to erase
617                  * toerase      = 3   available bytes to erase in segment
618                  * segremains   = 1   remaining bytes in segment after to
619                  *                    the right of the erased part, i.e.,
620                  *                    the memory that needs to be moved to the
621                  *                    left.
622                  */
623                 /** Relative offset in segment for the absolute offset */
624                 size_t rof = (absof + of) - seg->seg_absof;
625                 /** How much remains to be erased */
626                 size_t eraseremains = size - of;
627                 /** How much can be erased from this segment */
628                 size_t toerase = RD_MIN(seg->seg_of - rof, eraseremains);
629                 /** How much remains in the segment after the erased part */
630                 size_t segremains = seg->seg_of - (rof + toerase);
631 
632                 next = TAILQ_NEXT(seg, seg_link);
633 
634                 seg->seg_absof -= of;
635 
636                 if (unlikely(toerase == 0))
637                         continue;
638 
639                 if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
640                         RD_BUG("rd_buf_erase() called on read-only segment");
641 
642                 if (likely(segremains > 0))
643                         memmove(seg->seg_p+rof, seg->seg_p+rof+toerase,
644                                 segremains);
645 
646                 seg->seg_of -= toerase;
647                 rbuf->rbuf_len -= toerase;
648 
649                 of += toerase;
650 
651                 /* If segment is now empty, remove it */
652                 if (seg->seg_of == 0)
653                         rd_buf_destroy_segment(rbuf, seg);
654         }
655 
656         /* Update absolute offset of remaining segments */
657         for (seg = next ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
658                 rd_assert(seg->seg_absof >= of);
659                 seg->seg_absof -= of;
660         }
661 
662         rbuf->rbuf_erased += of;
663 
664         return of;
665 }
666 
667 
668 
669 
670 /**
671  * @brief Do a write-seek, updating the write position to the given
672  *        absolute \p absof.
673  *
674  * @warning Any sub-sequent segments will be destroyed.
675  *
676  * @returns -1 if the offset is out of bounds, else 0.
677  */
678 int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) {
679         rd_segment_t *seg, *next;
680         size_t relof;
681 
682         seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
683         if (unlikely(!seg))
684                 return -1;
685 
686         relof = absof - seg->seg_absof;
687         if (unlikely(relof > seg->seg_of))
688                 return -1;
689 
690         /* Destroy sub-sequent segments in reverse order so that
691          * destroy_segment() length checks are correct.
692          * Will decrement rbuf_len et.al. */
693         for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ;
694              next != seg ; ) {
695                 rd_segment_t *this = next;
696                 next = TAILQ_PREV(this, rd_segment_head, seg_link);
697                 rd_buf_destroy_segment(rbuf, this);
698         }
699 
700         /* Update relative write offset */
701         seg->seg_of         = relof;
702         rbuf->rbuf_wpos     = seg;
703         rbuf->rbuf_len      = seg->seg_absof + seg->seg_of;
704 
705         rd_assert(rbuf->rbuf_len == absof);
706 
707         return 0;
708 }
709 
710 
711 /**
712  * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable
713  *        segments from the buffer's current write position.
714  *
715  * @param iovcntp will be set to the number of populated \p iovs[]
716  * @param size_max limits the total number of bytes made available.
717  *                 Note: this value may be overshot with the size of one
718  *                       segment.
719  *
720  * @returns the total number of bytes in the represented segments.
721  *
722  * @remark the write position will NOT be updated.
723  */
724 size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
725                              struct iovec *iovs, size_t *iovcntp,
726                              size_t iov_max, size_t size_max) {
727         const rd_segment_t *seg;
728         size_t iovcnt = 0;
729         size_t sum = 0;
730 
731         for (seg = rbuf->rbuf_wpos ;
732              seg && iovcnt < iov_max && sum < size_max ;
733              seg = TAILQ_NEXT(seg, seg_link)) {
734                 size_t len;
735                 void *p;
736 
737                 len = rd_segment_write_remains(seg, &p);
738                 if (unlikely(len == 0))
739                         continue;
740 
741                 iovs[iovcnt].iov_base  = p;
742                 iovs[iovcnt++].iov_len = len;
743 
744                 sum += len;
745         }
746 
747         *iovcntp = iovcnt;
748 
749         return sum;
750 }
751 
752 
753 
754 
755 
756 
757 
758 
759 
760 
761 
762 /**
763  * @name Slice reader interface
764  *
765  * @{
766  */
767 
768 /**
769  * @brief Initialize a new slice of \p size bytes starting at \p seg with
770  *        relative offset \p rof.
771  *
772  * @returns 0 on success or -1 if there is not at least \p size bytes available
773  *          in the buffer.
774  */
775 int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
776                            const rd_segment_t *seg, size_t rof, size_t size) {
777         /* Verify that \p size bytes are indeed available in the buffer. */
778         if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size)))
779                 return -1;
780 
781         slice->buf    = rbuf;
782         slice->seg    = seg;
783         slice->rof    = rof;
784         slice->start  = seg->seg_absof + rof;
785         slice->end    = slice->start + size;
786 
787         rd_assert(seg->seg_absof+rof >= slice->start &&
788                   seg->seg_absof+rof <= slice->end);
789 
790         rd_assert(slice->end <= rd_buf_len(rbuf));
791 
792         return 0;
793 }
794 
795 /**
796  * @brief Initialize new slice of \p size bytes starting at offset \p absof
797  *
798  * @returns 0 on success or -1 if there is not at least \p size bytes available
799  *          in the buffer.
800  */
801 int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
802                    size_t absof, size_t size) {
803         const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL,
804                                                                absof);
805         if (unlikely(!seg))
806                 return -1;
807 
808         return rd_slice_init_seg(slice, rbuf, seg,
809                                  absof - seg->seg_absof, size);
810 }
811 
812 /**
813  * @brief Initialize new slice covering the full buffer \p rbuf
814  */
815 void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) {
816         int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf));
817         rd_assert(r == 0);
818 }
819 
820 
821 
822 /**
823  * @sa rd_slice_reader() rd_slice_peeker()
824  */
825 size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
826         size_t rof = slice->rof;
827         size_t rlen;
828         const rd_segment_t *seg;
829 
830         /* Find segment with non-zero payload */
831         for (seg = slice->seg ;
832              seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ;
833              seg = TAILQ_NEXT(seg, seg_link))
834                 rof = 0;
835 
836         if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
837                 return 0;
838 
839         *p   = (const void *)(seg->seg_p + rof);
840         rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));
841 
842         if (update_pos) {
843                 if (slice->seg != seg) {
844                         rd_assert(seg->seg_absof + rof >= slice->start &&
845                                   seg->seg_absof + rof+rlen <= slice->end);
846                         slice->seg  = seg;
847                         slice->rof  = rlen;
848                 } else {
849                         slice->rof += rlen;
850                 }
851         }
852 
853         return rlen;
854 }
855 
856 
857 /**
858  * @brief Convenience reader iterator interface.
859  *
860  *        Call repeatedly from while loop until it returns 0.
861  *
862  * @param slice slice to read from, position will be updated.
863  * @param p will be set to the start of \p *rlenp contiguous bytes of memory
864  * @param rlenp will be set to the number of bytes available in \p p
865  *
866  * @returns the number of bytes read, or 0 if slice is empty.
867  */
868 size_t rd_slice_reader (rd_slice_t *slice, const void **p) {
869         return rd_slice_reader0(slice, p, 1/*update_pos*/);
870 }
871 
872 /**
873  * @brief Identical to rd_slice_reader() but does NOT update the read position
874  */
875 size_t rd_slice_peeker (const rd_slice_t *slice, const void **p) {
876         return rd_slice_reader0((rd_slice_t *)slice, p, 0/*dont update_pos*/);
877 }
878 
879 
880 
881 
882 
883 /**
884  * @brief Read \p size bytes from current read position,
885  *        advancing the read offset by the number of bytes copied to \p dst.
886  *
887  *        If there are less than \p size remaining in the buffer
888  *        then 0 is returned and no bytes are copied.
889  *
890  * @returns \p size, or 0 if \p size bytes are not available in buffer.
891  *
892  * @remark This performs a complete read, no partitial reads.
893  *
894  * @remark If \p dst is NULL only the read position is updated.
895  */
896 size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) {
897         size_t remains = size;
898         char *d = (char *)dst; /* Possibly NULL */
899         size_t rlen;
900         const void *p;
901         size_t orig_end = slice->end;
902 
903         if (unlikely(rd_slice_remains(slice) < size))
904                 return 0;
905 
906         /* Temporarily shrink slice to offset + \p size */
907         slice->end = rd_slice_abs_offset(slice) + size;
908 
909         while ((rlen = rd_slice_reader(slice, &p))) {
910                 rd_dassert(remains >= rlen);
911                 if (dst) {
912                         memcpy(d, p, rlen);
913                         d       += rlen;
914                 }
915                 remains -= rlen;
916         }
917 
918         rd_dassert(remains == 0);
919 
920         /* Restore original size */
921         slice->end = orig_end;
922 
923         return size;
924 }
925 
926 
927 /**
928  * @brief Read \p size bytes from absolute slice offset \p offset
929  *        and store in \p dst, without updating the slice read position.
930  *
931  * @returns \p size if the offset and size was within the slice, else 0.
932  */
933 size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
934                       void *dst, size_t size) {
935         rd_slice_t sub = *slice;
936 
937         if (unlikely(rd_slice_seek(&sub, offset) == -1))
938                 return 0;
939 
940         return rd_slice_read(&sub, dst, size);
941 
942 }
943 
944 
945 /**
946  * @brief Read a varint-encoded unsigned integer from \p slice,
947  *        storing the decoded number in \p nump on success (return value > 0).
948  *
949  * @returns the number of bytes read on success or 0 in case of
950  *          buffer underflow.
951  */
952 size_t rd_slice_read_uvarint (rd_slice_t *slice, uint64_t *nump) {
953         uint64_t num = 0;
954         int shift = 0;
955         size_t rof = slice->rof;
956         const rd_segment_t *seg;
957 
958         /* Traverse segments, byte for byte, until varint is decoded
959          * or no more segments available (underflow). */
960         for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
961                 for ( ; rof < seg->seg_of ; rof++) {
962                         unsigned char oct;
963 
964                         if (unlikely(seg->seg_absof+rof >= slice->end))
965                                 return 0; /* Underflow */
966 
967                         oct = *(const unsigned char *)(seg->seg_p + rof);
968 
969                         num |= (uint64_t)(oct & 0x7f) << shift;
970                         shift += 7;
971 
972                         if (!(oct & 0x80)) {
973                                 /* Done: no more bytes expected */
974                                 *nump = num;
975 
976                                 /* Update slice's read pointer and offset */
977                                 if (slice->seg != seg)
978                                         slice->seg = seg;
979                                 slice->rof = rof + 1; /* including the +1 byte
980                                                        * that was just read */
981 
982                                 return shift / 7;
983                         }
984                 }
985 
986                 rof = 0;
987         }
988 
989         return 0; /* Underflow */
990 }
991 
992 
993 /**
994  * @returns a pointer to \p size contiguous bytes at the current read offset.
995  *          If there isn't \p size contiguous bytes available NULL will
996  *          be returned.
997  *
998  * @remark The read position is updated to point past \p size.
999  */
1000 const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size) {
1001         void *p;
1002 
1003         if (unlikely(rd_slice_remains(slice) < size ||
1004                      slice->rof + size > slice->seg->seg_of))
1005                 return NULL;
1006 
1007         p = slice->seg->seg_p + slice->rof;
1008 
1009         rd_slice_read(slice, NULL, size);
1010 
1011         return p;
1012 }
1013 
1014 
1015 
1016 /**
1017  * @brief Sets the slice's read position. The offset is the slice offset,
1018  *        not buffer offset.
1019  *
1020  * @returns 0 if offset was within range, else -1 in which case the position
1021  *          is not changed.
1022  */
1023 int rd_slice_seek (rd_slice_t *slice, size_t offset) {
1024         const rd_segment_t *seg;
1025         size_t absof = slice->start + offset;
1026 
1027         if (unlikely(absof >= slice->end))
1028                 return -1;
1029 
1030         seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof);
1031         rd_assert(seg);
1032 
1033         slice->seg = seg;
1034         slice->rof = absof - seg->seg_absof;
1035         rd_assert(seg->seg_absof + slice->rof >= slice->start &&
1036                   seg->seg_absof + slice->rof <= slice->end);
1037 
1038         return 0;
1039 }
1040 
1041 
1042 /**
1043  * @brief Narrow the current slice to \p size, saving
1044  *        the original slice state info \p save_slice.
1045  *
1046  *        Use rd_slice_widen() to restore the saved slice
1047  *        with the read count updated from the narrowed slice.
1048  *
1049  *        This is useful for reading a sub-slice of a larger slice
1050  *        without having to pass the lesser length around.
1051  *
1052  * @returns 1 if enough underlying slice buffer memory is available, else 0.
1053  */
1054 int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size) {
1055         if (unlikely(slice->start + size > slice->end))
1056                 return 0;
1057         *save_slice = *slice;
1058         slice->end = slice->start + size;
1059         rd_assert(rd_slice_abs_offset(slice) <= slice->end);
1060         return 1;
1061 }
1062 
1063 /**
1064  * @brief Same as rd_slice_narrow() but using a relative size \p relsize
1065  *        from the current read position.
1066  */
1067 int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
1068                                size_t relsize) {
1069         return rd_slice_narrow(slice, save_slice,
1070                                rd_slice_offset(slice) + relsize);
1071 }
1072 
1073 
1074 /**
1075  * @brief Restore the original \p save_slice size from a previous call to
1076  *        rd_slice_narrow(), while keeping the updated read pointer from
1077  *        \p slice.
1078  */
1079 void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice) {
1080         slice->end = save_slice->end;
1081 }
1082 
1083 
1084 /**
1085  * @brief Copy the original slice \p orig to \p new_slice and adjust
1086  *        the new slice length to \p size.
1087  *
1088  *        This is a side-effect free form of rd_slice_narrow() which is not to
1089  *        be used with rd_slice_widen().
1090  *
1091  * @returns 1 if enough underlying slice buffer memory is available, else 0.
1092  */
1093 int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
1094                           size_t size) {
1095         if (unlikely(orig->start + size > orig->end))
1096                 return 0;
1097         *new_slice = *orig;
1098         new_slice->end = orig->start + size;
1099         rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end);
1100         return 1;
1101 }
1102 
1103 /**
1104  * @brief Same as rd_slice_narrow_copy() but with a relative size from
1105  *        the current read position.
1106  */
1107 int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
1108                                     rd_slice_t *new_slice,
1109                                     size_t relsize) {
1110         return rd_slice_narrow_copy(orig, new_slice,
1111                                     rd_slice_offset(orig) + relsize);
1112 }
1113 
1114 
1115 
1116 
1117 
1118 /**
1119  * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable
1120  *        segments from the slice's current read position.
1121  *
1122  * @param iovcntp will be set to the number of populated \p iovs[]
1123  * @param size_max limits the total number of bytes made available.
1124  *                 Note: this value may be overshot with the size of one
1125  *                       segment.
1126  *
1127  * @returns the total number of bytes in the represented segments.
1128  *
1129  * @remark will NOT update the read position.
1130  */
1131 size_t rd_slice_get_iov (const rd_slice_t *slice,
1132                          struct iovec *iovs, size_t *iovcntp,
1133                          size_t iov_max, size_t size_max) {
1134         const void *p;
1135         size_t rlen;
1136         size_t iovcnt = 0;
1137         size_t sum = 0;
1138         rd_slice_t copy = *slice; /* Use a copy of the slice so we dont
1139                                    * update the position for the caller. */
1140 
1141         while (sum < size_max && iovcnt < iov_max &&
1142                (rlen = rd_slice_reader(&copy, &p))) {
1143                 iovs[iovcnt].iov_base  = (void *)p;
1144                 iovs[iovcnt++].iov_len = rlen;
1145 
1146                 sum += rlen;
1147         }
1148 
1149         *iovcntp = iovcnt;
1150 
1151         return sum;
1152 }
1153 
1154 
1155 
1156 
1157 
1158 /**
1159  * @brief CRC32 calculation of slice.
1160  *
1161  * @returns the calculated CRC
1162  *
1163  * @remark the slice's position is updated.
1164  */
1165 uint32_t rd_slice_crc32 (rd_slice_t *slice) {
1166         rd_crc32_t crc;
1167         const void *p;
1168         size_t rlen;
1169 
1170         crc = rd_crc32_init();
1171 
1172         while ((rlen = rd_slice_reader(slice, &p)))
1173                 crc = rd_crc32_update(crc, p, rlen);
1174 
1175         return (uint32_t)rd_crc32_finalize(crc);
1176 }
1177 
1178 /**
1179  * @brief Compute CRC-32C of segments starting at at buffer position \p absof,
1180  *        also supporting the case where the position/offset is not at the
1181  *        start of the first segment.
1182  *
1183  * @remark the slice's position is updated.
1184  */
1185 uint32_t rd_slice_crc32c (rd_slice_t *slice) {
1186         const void *p;
1187         size_t rlen;
1188         uint32_t crc = 0;
1189 
1190         while ((rlen = rd_slice_reader(slice, &p)))
1191                 crc = crc32c(crc, (const char *)p, rlen);
1192 
1193         return crc;
1194 }
1195 
1196 
1197 
1198 
1199 
1200 /**
1201  * @name Debugging dumpers
1202  *
1203  *
1204  */
1205 
1206 static void rd_segment_dump (const rd_segment_t *seg, const char *ind,
1207                              size_t relof, int do_hexdump) {
1208         fprintf(stderr,
1209                 "%s((rd_segment_t *)%p): "
1210                 "p %p, of %"PRIusz", "
1211                 "absof %"PRIusz", size %"PRIusz", free %p, flags 0x%x\n",
1212                 ind, seg, seg->seg_p, seg->seg_of,
1213                 seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags);
1214         rd_assert(relof <= seg->seg_of);
1215         if (do_hexdump)
1216                 rd_hexdump(stderr, "segment",
1217                            seg->seg_p+relof, seg->seg_of-relof);
1218 }
1219 
1220 void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump) {
1221         const rd_segment_t *seg;
1222 
1223         fprintf(stderr,
1224                 "((rd_buf_t *)%p):\n"
1225                 " len %"PRIusz" size %"PRIusz
1226                 ", %"PRIusz"/%"PRIusz" extra memory used\n",
1227                 rbuf, rbuf->rbuf_len, rbuf->rbuf_size,
1228                 rbuf->rbuf_extra_len, rbuf->rbuf_extra_size);
1229 
1230         if (rbuf->rbuf_wpos) {
1231                 fprintf(stderr, " wpos:\n");
1232                 rd_segment_dump(rbuf->rbuf_wpos, "  ", 0, 0);
1233         }
1234 
1235         if (rbuf->rbuf_segment_cnt > 0) {
1236                 size_t segcnt = 0;
1237 
1238                 fprintf(stderr, " %"PRIusz" linked segments:\n",
1239                         rbuf->rbuf_segment_cnt);
1240                 TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) {
1241                         rd_segment_dump(seg, "  ", 0, do_hexdump);
1242                         segcnt++;
1243                         rd_assert(segcnt <= rbuf->rbuf_segment_cnt);
1244                 }
1245         }
1246 }
1247 
1248 void rd_slice_dump (const rd_slice_t *slice, int do_hexdump) {
1249         const rd_segment_t *seg;
1250         size_t relof;
1251 
1252         fprintf(stderr,
1253                 "((rd_slice_t *)%p):\n"
1254                 "  buf %p (len %"PRIusz"), seg %p (absof %"PRIusz"), "
1255                 "rof %"PRIusz", start %"PRIusz", end %"PRIusz", size %"PRIusz
1256                 ", offset %"PRIusz"\n",
1257                 slice, slice->buf, rd_buf_len(slice->buf),
1258                 slice->seg, slice->seg ? slice->seg->seg_absof : 0,
1259                 slice->rof, slice->start, slice->end,
1260                 rd_slice_size(slice), rd_slice_offset(slice));
1261         relof = slice->rof;
1262 
1263         for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
1264                 rd_segment_dump(seg, "  ", relof, do_hexdump);
1265                 relof = 0;
1266         }
1267 }
1268 
1269 
1270 /**
1271  * @name Unit-tests
1272  *
1273  *
1274  *
1275  */
1276 
1277 
1278 /**
1279  * @brief Basic write+read test
1280  */
1281 static int do_unittest_write_read (void) {
1282         rd_buf_t b;
1283         char ones[1024];
1284         char twos[1024];
1285         char threes[1024];
1286         char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
1287         char buf[1024*3];
1288         rd_slice_t slice;
1289         size_t r, pos;
1290 
1291         memset(ones, 0x1, sizeof(ones));
1292         memset(twos, 0x2, sizeof(twos));
1293         memset(threes, 0x3, sizeof(threes));
1294         memset(fiftyfives, 0x55, sizeof(fiftyfives));
1295         memset(buf, 0x55, sizeof(buf));
1296 
1297         rd_buf_init(&b, 2, 1000);
1298 
1299         /*
1300          * Verify write
1301          */
1302         r = rd_buf_write(&b, ones, 200);
1303         RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
1304         pos = rd_buf_write_pos(&b);
1305         RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
1306 
1307         r = rd_buf_write(&b, twos, 800);
1308         RD_UT_ASSERT(r == 200, "write() returned position %"PRIusz, r);
1309         pos = rd_buf_write_pos(&b);
1310         RD_UT_ASSERT(pos == 200+800, "pos() returned position %"PRIusz, pos);
1311 
1312         /* Buffer grows here */
1313         r = rd_buf_write(&b, threes, 1);
1314         RD_UT_ASSERT(pos == 200+800,
1315                      "write() returned position %"PRIusz, r);
1316         pos = rd_buf_write_pos(&b);
1317         RD_UT_ASSERT(pos == 200+800+1, "pos() returned position %"PRIusz, pos);
1318 
1319         /*
1320          * Verify read
1321          */
1322         /* Get full slice. */
1323         rd_slice_init_full(&slice, &b);
1324 
1325         r = rd_slice_read(&slice, buf, 200+800+2);
1326         RD_UT_ASSERT(r == 0,
1327                      "read() > remaining should have failed, gave %"PRIusz, r);
1328         r = rd_slice_read(&slice, buf, 200+800+1);
1329         RD_UT_ASSERT(r == 200+800+1,
1330                      "read() returned %"PRIusz" (%"PRIusz" remains)",
1331                      r, rd_slice_remains(&slice));
1332 
1333         RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones");
1334         RD_UT_ASSERT(!memcmp(buf+200, twos, 800), "verify twos");
1335         RD_UT_ASSERT(!memcmp(buf+200+800, threes, 1), "verify threes");
1336         RD_UT_ASSERT(!memcmp(buf+200+800+1, fiftyfives, 100), "verify 55s");
1337 
1338         rd_buf_destroy(&b);
1339 
1340         RD_UT_PASS();
1341 }
1342 
1343 
1344 /**
1345  * @brief Helper read verifier, not a unit-test itself.
1346  */
1347 #define do_unittest_read_verify(b,absof,len,verify) do {                \
1348                 int __fail = do_unittest_read_verify0(b,absof,len,verify); \
1349                 RD_UT_ASSERT(!__fail,                                   \
1350                              "read_verify(absof=%"PRIusz",len=%"PRIusz") " \
1351                              "failed", (size_t)absof, (size_t)len);     \
1352         } while (0)
1353 
1354 static int
1355 do_unittest_read_verify0 (const rd_buf_t *b, size_t absof, size_t len,
1356                           const char *verify) {
1357         rd_slice_t slice, sub;
1358         char buf[1024];
1359         size_t half;
1360         size_t r;
1361         int i;
1362 
1363         rd_assert(sizeof(buf) >= len);
1364 
1365         /* Get reader slice */
1366         i = rd_slice_init(&slice, b, absof, len);
1367         RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i);
1368 
1369         r = rd_slice_read(&slice, buf, len);
1370         RD_UT_ASSERT(r == len,
1371                      "read() returned %"PRIusz" expected %"PRIusz
1372                      " (%"PRIusz" remains)",
1373                      r, len, rd_slice_remains(&slice));
1374 
1375         RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
1376 
1377         r = rd_slice_offset(&slice);
1378         RD_UT_ASSERT(r == len, "offset() returned %"PRIusz", not %"PRIusz,
1379                      r, len);
1380 
1381         half = len / 2;
1382         i = rd_slice_seek(&slice, half);
1383         RD_UT_ASSERT(i == 0, "seek(%"PRIusz") returned %d", half, i);
1384         r = rd_slice_offset(&slice);
1385         RD_UT_ASSERT(r == half, "offset() returned %"PRIusz", not %"PRIusz,
1386                      r, half);
1387 
1388         /* Get a sub-slice covering the later half. */
1389         sub = rd_slice_pos(&slice);
1390         r = rd_slice_offset(&sub);
1391         RD_UT_ASSERT(r == 0, "sub: offset() returned %"PRIusz", not %"PRIusz,
1392                      r, (size_t)0);
1393         r = rd_slice_size(&sub);
1394         RD_UT_ASSERT(r == half, "sub: size() returned %"PRIusz", not %"PRIusz,
1395                      r, half);
1396         r = rd_slice_remains(&sub);
1397         RD_UT_ASSERT(r == half,
1398                      "sub: remains() returned %"PRIusz", not %"PRIusz,
1399                      r, half);
1400 
1401         /* Read half */
1402         r = rd_slice_read(&sub, buf, half);
1403         RD_UT_ASSERT(r == half,
1404                      "sub read() returned %"PRIusz" expected %"PRIusz
1405                      " (%"PRIusz" remains)",
1406                      r, len, rd_slice_remains(&sub));
1407 
1408         RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
1409 
1410         r = rd_slice_offset(&sub);
1411         RD_UT_ASSERT(r == rd_slice_size(&sub),
1412                      "sub offset() returned %"PRIusz", not %"PRIusz,
1413                      r, rd_slice_size(&sub));
1414         r = rd_slice_remains(&sub);
1415         RD_UT_ASSERT(r == 0,
1416                      "sub: remains() returned %"PRIusz", not %"PRIusz,
1417                      r, (size_t)0);
1418 
1419         return 0;
1420 }
1421 
1422 
1423 /**
1424  * @brief write_seek() and split() test
1425  */
1426 static int do_unittest_write_split_seek (void) {
1427         rd_buf_t b;
1428         char ones[1024];
1429         char twos[1024];
1430         char threes[1024];
1431         char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
1432         char buf[1024*3];
1433         size_t r, pos;
1434         rd_segment_t *seg, *newseg;
1435 
1436         memset(ones, 0x1, sizeof(ones));
1437         memset(twos, 0x2, sizeof(twos));
1438         memset(threes, 0x3, sizeof(threes));
1439         memset(fiftyfives, 0x55, sizeof(fiftyfives));
1440         memset(buf, 0x55, sizeof(buf));
1441 
1442         rd_buf_init(&b, 0, 0);
1443 
1444         /*
1445          * Verify write
1446          */
1447         r = rd_buf_write(&b, ones, 400);
1448         RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
1449         pos = rd_buf_write_pos(&b);
1450         RD_UT_ASSERT(pos == 400, "pos() returned position %"PRIusz, pos);
1451 
1452         do_unittest_read_verify(&b, 0, 400, ones);
1453 
1454         /*
1455          * Seek and re-write
1456          */
1457         r = rd_buf_write_seek(&b, 200);
1458         RD_UT_ASSERT(r == 0, "seek() failed");
1459         pos = rd_buf_write_pos(&b);
1460         RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
1461 
1462         r = rd_buf_write(&b, twos, 100);
1463         RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
1464         pos = rd_buf_write_pos(&b);
1465         RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1466 
1467         do_unittest_read_verify(&b, 0, 200, ones);
1468         do_unittest_read_verify(&b, 200, 100, twos);
1469 
1470         /* Make sure read() did not modify the write position. */
1471         pos = rd_buf_write_pos(&b);
1472         RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1473 
1474         /* Split buffer, write position is now at split where writes
1475         * are not allowed (mid buffer). */
1476         seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
1477         RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment");
1478         newseg = rd_segment_split(&b, seg, 50);
1479         rd_buf_append_segment(&b, newseg);
1480         seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
1481         RD_UT_ASSERT(seg != NULL, "seg");
1482         RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg);
1483         RD_UT_ASSERT(seg->seg_of > 0,
1484                      "assumed beginning of segment, got %"PRIusz, seg->seg_of);
1485 
1486         pos = rd_buf_write_pos(&b);
1487         RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1488 
1489         /* Re-verify that nothing changed */
1490         do_unittest_read_verify(&b, 0, 200, ones);
1491         do_unittest_read_verify(&b, 200, 100, twos);
1492 
1493         /* Do a write seek at buffer boundary, sub-sequent buffers should
1494          * be destroyed. */
1495         r = rd_buf_write_seek(&b, 50);
1496         RD_UT_ASSERT(r == 0, "seek() failed");
1497         do_unittest_read_verify(&b, 0, 50, ones);
1498 
1499         rd_buf_destroy(&b);
1500 
1501         RD_UT_PASS();
1502 }
1503 
1504 /**
1505  * @brief Unittest to verify payload is correctly written and read.
1506  *        Each written u32 word is the running CRC of the word count.
1507  */
1508 static int do_unittest_write_read_payload_correctness (void) {
1509         uint32_t crc;
1510         uint32_t write_crc, read_crc;
1511         const int seed = 12345;
1512         rd_buf_t b;
1513         const size_t max_cnt = 20000;
1514         rd_slice_t slice;
1515         size_t r;
1516         size_t i;
1517         int pass;
1518 
1519         crc = rd_crc32_init();
1520         crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
1521 
1522         rd_buf_init(&b, 0, 0);
1523         for (i = 0 ; i < max_cnt ; i++) {
1524                 crc = rd_crc32_update(crc, (void *)&i, sizeof(i));
1525                 rd_buf_write(&b, &crc, sizeof(crc));
1526         }
1527 
1528         write_crc = rd_crc32_finalize(crc);
1529 
1530         r = rd_buf_len(&b);
1531         RD_UT_ASSERT(r == max_cnt * sizeof(crc),
1532                      "expected length %"PRIusz", not %"PRIusz,
1533                      r, max_cnt * sizeof(crc));
1534 
1535         /*
1536          * Now verify the contents with a reader.
1537          */
1538         rd_slice_init_full(&slice, &b);
1539 
1540         r = rd_slice_remains(&slice);
1541         RD_UT_ASSERT(r == rd_buf_len(&b),
1542                      "slice remains %"PRIusz", should be %"PRIusz,
1543                      r, rd_buf_len(&b));
1544 
1545         for (pass = 0 ; pass < 2 ; pass++) {
1546                 /* Two passes:
1547                  *  - pass 1: using peek()
1548                  *  - pass 2: using read()
1549                  */
1550                 const char *pass_str = pass == 0 ? "peek":"read";
1551 
1552                 crc = rd_crc32_init();
1553                 crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
1554 
1555                 for (i = 0 ; i < max_cnt ; i++) {
1556                         uint32_t buf_crc;
1557 
1558                         crc = rd_crc32_update(crc, (void *)&i, sizeof(i));
1559 
1560                         if (pass == 0)
1561                                 r = rd_slice_peek(&slice, i * sizeof(buf_crc),
1562                                                   &buf_crc, sizeof(buf_crc));
1563                         else
1564                                 r = rd_slice_read(&slice, &buf_crc,
1565                                                   sizeof(buf_crc));
1566                         RD_UT_ASSERT(r == sizeof(buf_crc),
1567                                      "%s() at #%"PRIusz" failed: "
1568                                      "r is %"PRIusz" not %"PRIusz,
1569                                      pass_str, i, r, sizeof(buf_crc));
1570                         RD_UT_ASSERT(buf_crc == crc,
1571                                      "%s: invalid crc at #%"PRIusz
1572                                      ": expected %"PRIu32", read %"PRIu32,
1573                                      pass_str, i, crc, buf_crc);
1574                 }
1575 
1576                 read_crc = rd_crc32_finalize(crc);
1577 
1578                 RD_UT_ASSERT(read_crc == write_crc,
1579                              "%s: finalized read crc %"PRIu32
1580                              " != write crc %"PRIu32,
1581                              pass_str, read_crc, write_crc);
1582 
1583         }
1584 
1585         r = rd_slice_remains(&slice);
1586         RD_UT_ASSERT(r == 0,
1587                      "slice remains %"PRIusz", should be %"PRIusz,
1588                      r, (size_t)0);
1589 
1590         rd_buf_destroy(&b);
1591 
1592         RD_UT_PASS();
1593 }
1594 
1595 #define do_unittest_iov_verify(...) do {                                \
1596                 int __fail = do_unittest_iov_verify0(__VA_ARGS__);      \
1597                 RD_UT_ASSERT(!__fail, "iov_verify() failed");           \
1598         } while (0)
1599 static int do_unittest_iov_verify0 (rd_buf_t *b,
1600                                     size_t exp_iovcnt, size_t exp_totsize) {
1601         #define MY_IOV_MAX 16
1602         struct iovec iov[MY_IOV_MAX];
1603         size_t iovcnt;
1604         size_t i;
1605         size_t totsize, sum;
1606 
1607         rd_assert(exp_iovcnt <= MY_IOV_MAX);
1608 
1609         totsize = rd_buf_get_write_iov(b, iov, &iovcnt,
1610                                        MY_IOV_MAX, exp_totsize);
1611         RD_UT_ASSERT(totsize >= exp_totsize,
1612                      "iov total size %"PRIusz" expected >= %"PRIusz,
1613                      totsize, exp_totsize);
1614         RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX,
1615                      "iovcnt %"PRIusz
1616                      ", expected %"PRIusz" < x <= MY_IOV_MAX",
1617                      iovcnt, exp_iovcnt);
1618 
1619         sum = 0;
1620         for (i = 0 ; i < iovcnt ; i++) {
1621                 RD_UT_ASSERT(iov[i].iov_base,
1622                              "iov #%"PRIusz" iov_base not set", i);
1623                 RD_UT_ASSERT(iov[i].iov_len,
1624                              "iov #%"PRIusz" iov_len %"PRIusz" out of range",
1625                              i, iov[i].iov_len);
1626                 sum += iov[i].iov_len;
1627                 RD_UT_ASSERT(sum <= totsize, "sum %"PRIusz" > totsize %"PRIusz,
1628                              sum, totsize);
1629         }
1630 
1631         RD_UT_ASSERT(sum == totsize,
1632                      "sum %"PRIusz" != totsize %"PRIusz,
1633                      sum, totsize);
1634 
1635         return 0;
1636 }
1637 
1638 
1639 /**
1640  * @brief Verify that buffer to iovec conversion works.
1641  */
1642 static int do_unittest_write_iov (void) {
1643         rd_buf_t b;
1644 
1645         rd_buf_init(&b, 0, 0);
1646         rd_buf_write_ensure(&b, 100, 100);
1647 
1648         do_unittest_iov_verify(&b, 1, 100);
1649 
1650         /* Add a secondary buffer */
1651         rd_buf_write_ensure(&b, 30000, 0);
1652 
1653         do_unittest_iov_verify(&b, 2, 100+30000);
1654 
1655 
1656         rd_buf_destroy(&b);
1657 
1658         RD_UT_PASS();
1659 }
1660 
1661 /**
1662  * @brief Verify that erasing parts of the buffer works.
1663  */
1664 static int do_unittest_erase (void) {
1665         static const struct {
1666                 const char *segs[4];
1667                 const char *writes[4];
1668                 struct {
1669                         size_t of;
1670                         size_t size;
1671                         size_t retsize;
1672                 } erasures[4];
1673 
1674                 const char *expect;
1675         } in[] = {
1676                 /* 12|3|45
1677                  *  x x xx */
1678                 { .segs = { "12", "3", "45" },
1679                   .erasures = { { 1, 4, 4 } },
1680                   .expect = "1",
1681                 },
1682                 /* 12|3|45
1683                  * xx */
1684                 { .segs = { "12", "3", "45" },
1685                   .erasures = { { 0, 2, 2 } },
1686                   .expect = "345",
1687                 },
1688                 /* 12|3|45
1689                  *      xx */
1690                 { .segs = { "12", "3", "45" },
1691                   .erasures = { { 3, 2, 2 } },
1692                   .expect = "123",
1693                 },
1694                 /* 12|3|45
1695                  *  x
1696                  * 1 |3|45
1697                  *    x
1698                  * 1 |  45
1699                  *       x */
1700                 { .segs = { "12", "3", "45" },
1701                   .erasures = { { 1, 1, 1 },
1702                                 { 1, 1, 1 },
1703                                 { 2, 1, 1 } },
1704                   .expect = "14",
1705                 },
1706                 /* 12|3|45
1707                  * xxxxxxx */
1708                 { .segs = { "12", "3", "45" },
1709                   .erasures = { { 0, 5, 5 } },
1710                   .expect = "",
1711                 },
1712                 /* 12|3|45
1713                  * x       */
1714                 { .segs = { "12", "3", "45" },
1715                   .erasures = { { 0, 1, 1 } },
1716                   .expect = "2345",
1717                 },
1718                 /* 12|3|45
1719                  *       x  */
1720                 { .segs = { "12", "3", "45" },
1721                   .erasures = { { 4, 1, 1 } },
1722                   .expect = "1234",
1723                 },
1724                 /* 12|3|45
1725                  *        x  */
1726                 { .segs = { "12", "3", "45" },
1727                   .erasures = { { 5, 10, 0 } },
1728                   .expect = "12345",
1729                 },
1730                 /* 12|3|45
1731                  *       xxx */
1732                 { .segs = { "12", "3", "45" },
1733                   .erasures = { { 4, 3, 1 }, { 4, 3, 0 }, { 4, 3, 0 } },
1734                   .expect = "1234",
1735                 },
1736                 /* 1
1737                  * xxx */
1738                 { .segs = { "1" },
1739                   .erasures = { { 0, 3, 1 } },
1740                   .expect = "",
1741                 },
1742                 /* 123456
1743                  * xxxxxx */
1744                 { .segs = { "123456" },
1745                   .erasures = { { 0, 6, 6 } },
1746                   .expect = "",
1747                 },
1748                 /* 123456789a
1749                  *     xxx    */
1750                 { .segs = { "123456789a" },
1751                   .erasures = { { 4, 3, 3 } },
1752                   .expect = "123489a",
1753                 },
1754                 /* 1234|5678
1755                  *    x xx   */
1756                 { .segs = { "1234", "5678" },
1757                   .erasures = { { 3, 3, 3 } },
1758                   .writes = { "9abc" },
1759                   .expect = "123789abc"
1760                 },
1761 
1762                 { .expect = NULL }
1763         };
1764         int i;
1765 
1766         for (i = 0 ; in[i].expect ; i++) {
1767                 rd_buf_t b;
1768                 rd_slice_t s;
1769                 size_t expsz = strlen(in[i].expect);
1770                 char *out;
1771                 int j;
1772                 size_t r;
1773                 int r2;
1774 
1775                 rd_buf_init(&b, 0, 0);
1776 
1777                 /* Write segments to buffer */
1778                 for (j = 0 ; in[i].segs[j] ; j++)
1779                         rd_buf_push_writable(&b, rd_strdup(in[i].segs[j]),
1780                                              strlen(in[i].segs[j]), rd_free);
1781 
1782                 /* Perform erasures */
1783                 for (j = 0 ; in[i].erasures[j].retsize ; j++) {
1784                         r = rd_buf_erase(&b,
1785                                          in[i].erasures[j].of,
1786                                          in[i].erasures[j].size);
1787                         RD_UT_ASSERT(r == in[i].erasures[j].retsize,
1788                                      "expected retsize %"PRIusz" for i=%d,j=%d"
1789                                      ", not %"PRIusz,
1790                                      in[i].erasures[j].retsize, i, j, r);
1791                 }
1792 
1793                 /* Perform writes */
1794                 for (j = 0 ; in[i].writes[j] ; j++)
1795                         rd_buf_write(&b, in[i].writes[j],
1796                                      strlen(in[i].writes[j]));
1797 
1798                 RD_UT_ASSERT(expsz == rd_buf_len(&b),
1799                              "expected buffer to be %"PRIusz" bytes, not "
1800                              "%"PRIusz" for i=%d",
1801                              expsz, rd_buf_len(&b), i);
1802 
1803                 /* Read back and verify */
1804                 r2 = rd_slice_init(&s, &b, 0, rd_buf_len(&b));
1805                 RD_UT_ASSERT((r2 == -1 && rd_buf_len(&b) == 0) ||
1806                              (r2 == 0 && rd_buf_len(&b) > 0),
1807                              "slice_init(%"PRIusz") returned %d for i=%d",
1808                              rd_buf_len(&b), r2, i);
1809                 if (r2 == -1)
1810                         continue; /* Empty buffer */
1811 
1812                 RD_UT_ASSERT(expsz == rd_slice_size(&s),
1813                              "expected slice to be %"PRIusz" bytes, not %"PRIusz
1814                              " for i=%d",
1815                              expsz, rd_slice_size(&s), i);
1816 
1817                 out = rd_malloc(expsz);
1818 
1819                 r = rd_slice_read(&s, out, expsz);
1820                 RD_UT_ASSERT(r == expsz,
1821                              "expected to read %"PRIusz" bytes, not %"PRIusz
1822                              " for i=%d",
1823                              expsz, r, i);
1824 
1825                 RD_UT_ASSERT(!memcmp(out, in[i].expect, expsz),
1826                              "Expected \"%.*s\", not \"%.*s\" for i=%d",
1827                              (int)expsz, in[i].expect,
1828                              (int)r, out, i);
1829 
1830                 rd_free(out);
1831 
1832                 RD_UT_ASSERT(rd_slice_remains(&s) == 0,
1833                              "expected no remaining bytes in slice, but got "
1834                              "%"PRIusz" for i=%d",
1835                              rd_slice_remains(&s), i);
1836 
1837                 rd_buf_destroy(&b);
1838         }
1839 
1840 
1841         RD_UT_PASS();
1842 }
1843 
1844 
1845 int unittest_rdbuf (void) {
1846         int fails = 0;
1847 
1848         fails += do_unittest_write_read();
1849         fails += do_unittest_write_split_seek();
1850         fails += do_unittest_write_read_payload_correctness();
1851         fails += do_unittest_write_iov();
1852         fails += do_unittest_erase();
1853 
1854         return fails;
1855 }
1856