1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2017 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 30 #include "rd.h" 31 #include "rdbuf.h" 32 #include "rdunittest.h" 33 #include "rdlog.h" 34 #include "rdcrc32.h" 35 #include "crc32c.h" 36 37 38 static size_t 39 rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p); 40 41 42 /** 43 * @brief Destroy the segment and free its payload. 44 * 45 * @remark Will NOT unlink from buffer. 46 */ 47 static void rd_segment_destroy (rd_segment_t *seg) { 48 /* Free payload */ 49 if (seg->seg_free && seg->seg_p) 50 seg->seg_free(seg->seg_p); 51 52 if (seg->seg_flags & RD_SEGMENT_F_FREE) 53 rd_free(seg); 54 } 55 56 /** 57 * @brief Initialize segment with absolute offset, backing memory pointer, 58 * and backing memory size. 59 * @remark The segment is NOT linked. 60 */ 61 static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) { 62 memset(seg, 0, sizeof(*seg)); 63 seg->seg_p = mem; 64 seg->seg_size = size; 65 } 66 67 68 /** 69 * @brief Append segment to buffer 70 * 71 * @remark Will set the buffer position to the new \p seg if no existing wpos. 72 * @remark Will set the segment seg_absof to the current length of the buffer. 73 */ 74 static rd_segment_t *rd_buf_append_segment (rd_buf_t *rbuf, rd_segment_t *seg) { 75 TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link); 76 rbuf->rbuf_segment_cnt++; 77 seg->seg_absof = rbuf->rbuf_len; 78 rbuf->rbuf_len += seg->seg_of; 79 rbuf->rbuf_size += seg->seg_size; 80 81 /* Update writable position */ 82 if (!rbuf->rbuf_wpos) 83 rbuf->rbuf_wpos = seg; 84 else 85 rd_buf_get_writable0(rbuf, NULL, NULL); 86 87 return seg; 88 } 89 90 91 92 93 /** 94 * @brief Attempt to allocate \p size bytes from the buffers extra buffers. 95 * @returns the allocated pointer which MUST NOT be freed, or NULL if 96 * not enough memory. 97 * @remark the returned pointer is memory-aligned to be safe. 98 */ 99 static void *extra_alloc (rd_buf_t *rbuf, size_t size) { 100 size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */ 101 void *p; 102 103 if (of + size > rbuf->rbuf_extra_size) 104 return NULL; 105 106 p = rbuf->rbuf_extra + of; /* Aligned pointer */ 107 108 rbuf->rbuf_extra_len = of + size; 109 110 return p; 111 } 112 113 114 115 /** 116 * @brief Get a pre-allocated segment if available, or allocate a new 117 * segment with the extra amount of \p size bytes allocated for payload. 118 * 119 * Will not append the segment to the buffer. 120 */ 121 static rd_segment_t * 122 rd_buf_alloc_segment0 (rd_buf_t *rbuf, size_t size) { 123 rd_segment_t *seg; 124 125 /* See if there is enough room in the extra buffer for 126 * allocating the segment header and the buffer, 127 * or just the segment header, else fall back to malloc. */ 128 if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) { 129 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size); 130 131 } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) { 132 rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size); 133 if (size > 0) 134 seg->seg_free = rd_free; 135 136 } else if ((seg = rd_malloc(sizeof(*seg) + size))) { 137 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size); 138 seg->seg_flags |= RD_SEGMENT_F_FREE; 139 140 } else 141 rd_assert(!*"segment allocation failure"); 142 143 return seg; 144 } 145 146 /** 147 * @brief Allocate between \p min_size .. \p max_size of backing memory 148 * and add it as a new segment to the buffer. 149 * 150 * The buffer position is updated to point to the new segment. 151 * 152 * The segment will be over-allocated if permitted by max_size 153 * (max_size == 0 or max_size > min_size). 154 */ 155 static rd_segment_t * 156 rd_buf_alloc_segment (rd_buf_t *rbuf, size_t min_size, size_t max_size) { 157 rd_segment_t *seg; 158 159 /* Over-allocate if allowed. */ 160 if (min_size != max_size || max_size == 0) 161 max_size = RD_MAX(sizeof(*seg) * 4, 162 RD_MAX(min_size * 2, 163 rbuf->rbuf_size / 2)); 164 165 seg = rd_buf_alloc_segment0(rbuf, max_size); 166 167 rd_buf_append_segment(rbuf, seg); 168 169 return seg; 170 } 171 172 173 /** 174 * @brief Ensures that \p size bytes will be available 175 * for writing and the position will be updated to point to the 176 * start of this contiguous block. 177 */ 178 void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) { 179 rd_segment_t *seg = rbuf->rbuf_wpos; 180 181 if (seg) { 182 void *p; 183 size_t remains = rd_segment_write_remains(seg, &p); 184 185 if (remains >= size) 186 return; /* Existing segment has enough space. */ 187 188 /* Future optimization: 189 * If existing segment has enough remaining space to warrant 190 * a split, do it, before allocating a new one. */ 191 } 192 193 /* Allocate new segment */ 194 rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size); 195 } 196 197 /** 198 * @brief Ensures that at least \p size bytes will be available for 199 * a future write. 200 * 201 * Typically used prior to a call to rd_buf_get_write_iov() 202 */ 203 void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size) { 204 size_t remains; 205 while ((remains = rd_buf_write_remains(rbuf)) < min_size) 206 rd_buf_alloc_segment(rbuf, 207 min_size - remains, 208 max_size ? max_size - remains : 0); 209 } 210 211 212 /** 213 * @returns the segment at absolute offset \p absof, or NULL if out of range. 214 * 215 * @remark \p hint is an optional segment where to start looking, such as 216 * the current write or read position. 217 */ 218 rd_segment_t * 219 rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint, 220 size_t absof) { 221 const rd_segment_t *seg = hint; 222 223 if (unlikely(absof >= rbuf->rbuf_len)) 224 return NULL; 225 226 /* Only use current write position if possible and if it helps */ 227 if (!seg || absof < seg->seg_absof) 228 seg = TAILQ_FIRST(&rbuf->rbuf_segments); 229 230 do { 231 if (absof >= seg->seg_absof && 232 absof < seg->seg_absof + seg->seg_of) { 233 rd_dassert(seg->seg_absof <= rd_buf_len(rbuf)); 234 return (rd_segment_t *)seg; 235 } 236 } while ((seg = TAILQ_NEXT(seg, seg_link))); 237 238 return NULL; 239 } 240 241 242 /** 243 * @brief Split segment \p seg at absolute offset \p absof, appending 244 * a new segment after \p seg with its memory pointing to the 245 * memory starting at \p absof. 246 * \p seg 's memory will be shorted to the \p absof. 247 * 248 * The new segment is NOT appended to the buffer. 249 * 250 * @warning MUST ONLY be used on the LAST segment 251 * 252 * @warning if a segment is inserted between these two splitted parts 253 * it is imperative that the later segment's absof is corrected. 254 * 255 * @remark The seg_free callback is retained on the original \p seg 256 * and is not copied to the new segment, but flags are copied. 257 */ 258 static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg, 259 size_t absof) { 260 rd_segment_t *newseg; 261 size_t relof; 262 263 rd_assert(seg == rbuf->rbuf_wpos); 264 rd_assert(absof >= seg->seg_absof && 265 absof <= seg->seg_absof + seg->seg_of); 266 267 relof = absof - seg->seg_absof; 268 269 newseg = rd_buf_alloc_segment0(rbuf, 0); 270 271 /* Add later part of split bytes to new segment */ 272 newseg->seg_p = seg->seg_p+relof; 273 newseg->seg_of = seg->seg_of-relof; 274 newseg->seg_size = seg->seg_size-relof; 275 newseg->seg_absof = SIZE_MAX; /* Invalid */ 276 newseg->seg_flags |= seg->seg_flags; 277 278 /* Remove earlier part of split bytes from previous segment */ 279 seg->seg_of = relof; 280 seg->seg_size = relof; 281 282 /* newseg's length will be added to rbuf_len in append_segment(), 283 * so shave it off here from seg's perspective. */ 284 rbuf->rbuf_len -= newseg->seg_of; 285 rbuf->rbuf_size -= newseg->seg_size; 286 287 return newseg; 288 } 289 290 291 292 293 /** 294 * @brief Unlink and destroy a segment, updating the \p rbuf 295 * with the decrease in length and capacity. 296 */ 297 static void rd_buf_destroy_segment (rd_buf_t *rbuf, rd_segment_t *seg) { 298 rd_assert(rbuf->rbuf_segment_cnt > 0 && 299 rbuf->rbuf_len >= seg->seg_of && 300 rbuf->rbuf_size >= seg->seg_size); 301 302 TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link); 303 rbuf->rbuf_segment_cnt--; 304 rbuf->rbuf_len -= seg->seg_of; 305 rbuf->rbuf_size -= seg->seg_size; 306 if (rbuf->rbuf_wpos == seg) 307 rbuf->rbuf_wpos = NULL; 308 309 rd_segment_destroy(seg); 310 } 311 312 313 /** 314 * @brief Free memory associated with the \p rbuf, but not the rbuf itself. 315 * Segments will be destroyed. 316 */ 317 void rd_buf_destroy (rd_buf_t *rbuf) { 318 rd_segment_t *seg, *tmp; 319 320 #if ENABLE_DEVEL 321 /* FIXME */ 322 if (rbuf->rbuf_len > 0 && 0) { 323 size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len; 324 float fill_grade = (float)rbuf->rbuf_len / 325 (float)rbuf->rbuf_size; 326 327 printf("fill grade: %.2f%% (%"PRIusz" bytes over-allocated)\n", 328 fill_grade * 100.0f, overalloc); 329 } 330 #endif 331 332 333 TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) { 334 rd_segment_destroy(seg); 335 336 } 337 338 if (rbuf->rbuf_extra) 339 rd_free(rbuf->rbuf_extra); 340 } 341 342 343 /** 344 * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments 345 * where the first segment will have a \p buf_size of backing memory. 346 * 347 * The caller may rearrange the backing memory as it see fits. 348 */ 349 void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) { 350 size_t totalloc = 0; 351 352 memset(rbuf, 0, sizeof(*rbuf)); 353 TAILQ_INIT(&rbuf->rbuf_segments); 354 355 if (!fixed_seg_cnt) { 356 assert(!buf_size); 357 return; 358 } 359 360 /* Pre-allocate memory for a fixed set of segments that are known 361 * before-hand, to minimize the number of extra allocations 362 * needed for well-known layouts (such as headers, etc) */ 363 totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt; 364 365 /* Pre-allocate extra space for the backing buffer. */ 366 totalloc += buf_size; 367 368 rbuf->rbuf_extra_size = totalloc; 369 rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size); 370 } 371 372 373 374 375 /** 376 * @brief Convenience writer iterator interface. 377 * 378 * After writing to \p p the caller must update the written length 379 * by calling rd_buf_write(rbuf, NULL, written_length) 380 * 381 * @returns the number of contiguous writable bytes in segment 382 * and sets \p *p to point to the start of the memory region. 383 */ 384 static size_t 385 rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) { 386 rd_segment_t *seg; 387 388 for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) { 389 size_t len = rd_segment_write_remains(seg, p); 390 391 /* Even though the write offset hasn't changed we 392 * avoid future segment scans by adjusting the 393 * wpos here to the first writable segment. */ 394 rbuf->rbuf_wpos = seg; 395 if (segp) 396 *segp = seg; 397 398 if (unlikely(len == 0)) 399 continue; 400 401 /* Also adjust absof if the segment was allocated 402 * before the previous segment's memory was exhausted 403 * and thus now might have a lower absolute offset 404 * than the previos segment's now higher relative offset. */ 405 if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len) 406 seg->seg_absof = rbuf->rbuf_len; 407 408 return len; 409 } 410 411 return 0; 412 } 413 414 size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p) { 415 rd_segment_t *seg; 416 return rd_buf_get_writable0(rbuf, &seg, p); 417 } 418 419 420 421 422 /** 423 * @brief Write \p payload of \p size bytes to current position 424 * in buffer. A new segment will be allocated and appended 425 * if needed. 426 * 427 * @returns the write position where payload was written (pre-write). 428 * Returning the pre-positition allows write_update() to later 429 * update the same location, effectively making write()s 430 * also a place-holder mechanism. 431 * 432 * @remark If \p payload is NULL only the write position is updated, 433 * in this mode it is required for the buffer to have enough 434 * memory for the NULL write (as it would otherwise cause 435 * uninitialized memory in any new segments allocated from this 436 * function). 437 */ 438 size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) { 439 size_t remains = size; 440 size_t initial_absof; 441 const char *psrc = (const char *)payload; 442 443 initial_absof = rbuf->rbuf_len; 444 445 /* Ensure enough space by pre-allocating segments. */ 446 rd_buf_write_ensure(rbuf, size, 0); 447 448 while (remains > 0) { 449 void *p = NULL; 450 rd_segment_t *seg = NULL; 451 size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p); 452 size_t wlen = RD_MIN(remains, segremains); 453 454 rd_dassert(seg == rbuf->rbuf_wpos); 455 rd_dassert(wlen > 0); 456 rd_dassert(seg->seg_p+seg->seg_of <= (char *)p && 457 (char *)p < seg->seg_p+seg->seg_size); 458 459 if (payload) { 460 memcpy(p, psrc, wlen); 461 psrc += wlen; 462 } 463 464 seg->seg_of += wlen; 465 rbuf->rbuf_len += wlen; 466 remains -= wlen; 467 } 468 469 rd_assert(remains == 0); 470 471 return initial_absof; 472 } 473 474 475 476 /** 477 * @brief Write \p slice to \p rbuf 478 * 479 * @remark The slice position will be updated. 480 * 481 * @returns the number of bytes witten (always slice length) 482 */ 483 size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice) { 484 const void *p; 485 size_t rlen; 486 size_t sum = 0; 487 488 while ((rlen = rd_slice_reader(slice, &p))) { 489 size_t r; 490 r = rd_buf_write(rbuf, p, rlen); 491 rd_dassert(r != 0); 492 sum += r; 493 } 494 495 return sum; 496 } 497 498 499 500 /** 501 * @brief Write \p payload of \p size at absolute offset \p absof 502 * WITHOUT updating the total buffer length. 503 * 504 * This is used to update a previously written region, such 505 * as updating the header length. 506 * 507 * @returns the number of bytes written, which may be less than \p size 508 * if the update spans multiple segments. 509 */ 510 static size_t rd_segment_write_update (rd_segment_t *seg, size_t absof, 511 const void *payload, size_t size) { 512 size_t relof; 513 size_t wlen; 514 515 rd_dassert(absof >= seg->seg_absof); 516 relof = absof - seg->seg_absof; 517 rd_assert(relof <= seg->seg_of); 518 wlen = RD_MIN(size, seg->seg_of - relof); 519 rd_dassert(relof + wlen <= seg->seg_of); 520 521 memcpy(seg->seg_p+relof, payload, wlen); 522 523 return wlen; 524 } 525 526 527 528 /** 529 * @brief Write \p payload of \p size at absolute offset \p absof 530 * WITHOUT updating the total buffer length. 531 * 532 * This is used to update a previously written region, such 533 * as updating the header length. 534 */ 535 size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof, 536 const void *payload, size_t size) { 537 rd_segment_t *seg; 538 const char *psrc = (const char *)payload; 539 size_t of; 540 541 /* Find segment for offset */ 542 seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); 543 rd_assert(seg && *"invalid absolute offset"); 544 545 for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) { 546 rd_assert(seg->seg_absof <= rd_buf_len(rbuf)); 547 size_t wlen = rd_segment_write_update(seg, absof+of, 548 psrc+of, size-of); 549 of += wlen; 550 } 551 552 rd_dassert(of == size); 553 554 return of; 555 } 556 557 558 559 /** 560 * @brief Push reference memory segment to current write position. 561 */ 562 void rd_buf_push0 (rd_buf_t *rbuf, const void *payload, size_t size, 563 void (*free_cb)(void *), rd_bool_t writable) { 564 rd_segment_t *prevseg, *seg, *tailseg = NULL; 565 566 if ((prevseg = rbuf->rbuf_wpos) && 567 rd_segment_write_remains(prevseg, NULL) > 0) { 568 /* If the current segment still has room in it split it 569 * and insert the pushed segment in the middle (below). */ 570 tailseg = rd_segment_split(rbuf, prevseg, 571 prevseg->seg_absof + 572 prevseg->seg_of); 573 } 574 575 seg = rd_buf_alloc_segment0(rbuf, 0); 576 seg->seg_p = (char *)payload; 577 seg->seg_size = size; 578 seg->seg_of = size; 579 seg->seg_free = free_cb; 580 if (!writable) 581 seg->seg_flags |= RD_SEGMENT_F_RDONLY; 582 583 rd_buf_append_segment(rbuf, seg); 584 585 if (tailseg) 586 rd_buf_append_segment(rbuf, tailseg); 587 } 588 589 590 591 /** 592 * @brief Erase \p size bytes at \p absof from buffer. 593 * 594 * @returns the number of bytes erased. 595 * 596 * @remark This is costly since it forces a memory move. 597 */ 598 size_t rd_buf_erase (rd_buf_t *rbuf, size_t absof, size_t size) { 599 rd_segment_t *seg, *next = NULL; 600 size_t of; 601 602 /* Find segment for offset */ 603 seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof); 604 605 /* Adjust segments until size is exhausted, then continue scanning to 606 * update the absolute offset. */ 607 for (of = 0 ; seg && of < size ; seg = next) { 608 /* Example: 609 * seg_absof = 10 610 * seg_of = 7 611 * absof = 12 612 * of = 1 613 * size = 4 614 * 615 * rof = 3 relative segment offset where to erase 616 * eraseremains = 3 remaining bytes to erase 617 * toerase = 3 available bytes to erase in segment 618 * segremains = 1 remaining bytes in segment after to 619 * the right of the erased part, i.e., 620 * the memory that needs to be moved to the 621 * left. 622 */ 623 /** Relative offset in segment for the absolute offset */ 624 size_t rof = (absof + of) - seg->seg_absof; 625 /** How much remains to be erased */ 626 size_t eraseremains = size - of; 627 /** How much can be erased from this segment */ 628 size_t toerase = RD_MIN(seg->seg_of - rof, eraseremains); 629 /** How much remains in the segment after the erased part */ 630 size_t segremains = seg->seg_of - (rof + toerase); 631 632 next = TAILQ_NEXT(seg, seg_link); 633 634 seg->seg_absof -= of; 635 636 if (unlikely(toerase == 0)) 637 continue; 638 639 if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY))) 640 RD_BUG("rd_buf_erase() called on read-only segment"); 641 642 if (likely(segremains > 0)) 643 memmove(seg->seg_p+rof, seg->seg_p+rof+toerase, 644 segremains); 645 646 seg->seg_of -= toerase; 647 rbuf->rbuf_len -= toerase; 648 649 of += toerase; 650 651 /* If segment is now empty, remove it */ 652 if (seg->seg_of == 0) 653 rd_buf_destroy_segment(rbuf, seg); 654 } 655 656 /* Update absolute offset of remaining segments */ 657 for (seg = next ; seg ; seg = TAILQ_NEXT(seg, seg_link)) { 658 rd_assert(seg->seg_absof >= of); 659 seg->seg_absof -= of; 660 } 661 662 rbuf->rbuf_erased += of; 663 664 return of; 665 } 666 667 668 669 670 /** 671 * @brief Do a write-seek, updating the write position to the given 672 * absolute \p absof. 673 * 674 * @warning Any sub-sequent segments will be destroyed. 675 * 676 * @returns -1 if the offset is out of bounds, else 0. 677 */ 678 int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) { 679 rd_segment_t *seg, *next; 680 size_t relof; 681 682 seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); 683 if (unlikely(!seg)) 684 return -1; 685 686 relof = absof - seg->seg_absof; 687 if (unlikely(relof > seg->seg_of)) 688 return -1; 689 690 /* Destroy sub-sequent segments in reverse order so that 691 * destroy_segment() length checks are correct. 692 * Will decrement rbuf_len et.al. */ 693 for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ; 694 next != seg ; ) { 695 rd_segment_t *this = next; 696 next = TAILQ_PREV(this, rd_segment_head, seg_link); 697 rd_buf_destroy_segment(rbuf, this); 698 } 699 700 /* Update relative write offset */ 701 seg->seg_of = relof; 702 rbuf->rbuf_wpos = seg; 703 rbuf->rbuf_len = seg->seg_absof + seg->seg_of; 704 705 rd_assert(rbuf->rbuf_len == absof); 706 707 return 0; 708 } 709 710 711 /** 712 * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable 713 * segments from the buffer's current write position. 714 * 715 * @param iovcntp will be set to the number of populated \p iovs[] 716 * @param size_max limits the total number of bytes made available. 717 * Note: this value may be overshot with the size of one 718 * segment. 719 * 720 * @returns the total number of bytes in the represented segments. 721 * 722 * @remark the write position will NOT be updated. 723 */ 724 size_t rd_buf_get_write_iov (const rd_buf_t *rbuf, 725 struct iovec *iovs, size_t *iovcntp, 726 size_t iov_max, size_t size_max) { 727 const rd_segment_t *seg; 728 size_t iovcnt = 0; 729 size_t sum = 0; 730 731 for (seg = rbuf->rbuf_wpos ; 732 seg && iovcnt < iov_max && sum < size_max ; 733 seg = TAILQ_NEXT(seg, seg_link)) { 734 size_t len; 735 void *p; 736 737 len = rd_segment_write_remains(seg, &p); 738 if (unlikely(len == 0)) 739 continue; 740 741 iovs[iovcnt].iov_base = p; 742 iovs[iovcnt++].iov_len = len; 743 744 sum += len; 745 } 746 747 *iovcntp = iovcnt; 748 749 return sum; 750 } 751 752 753 754 755 756 757 758 759 760 761 762 /** 763 * @name Slice reader interface 764 * 765 * @{ 766 */ 767 768 /** 769 * @brief Initialize a new slice of \p size bytes starting at \p seg with 770 * relative offset \p rof. 771 * 772 * @returns 0 on success or -1 if there is not at least \p size bytes available 773 * in the buffer. 774 */ 775 int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf, 776 const rd_segment_t *seg, size_t rof, size_t size) { 777 /* Verify that \p size bytes are indeed available in the buffer. */ 778 if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size))) 779 return -1; 780 781 slice->buf = rbuf; 782 slice->seg = seg; 783 slice->rof = rof; 784 slice->start = seg->seg_absof + rof; 785 slice->end = slice->start + size; 786 787 rd_assert(seg->seg_absof+rof >= slice->start && 788 seg->seg_absof+rof <= slice->end); 789 790 rd_assert(slice->end <= rd_buf_len(rbuf)); 791 792 return 0; 793 } 794 795 /** 796 * @brief Initialize new slice of \p size bytes starting at offset \p absof 797 * 798 * @returns 0 on success or -1 if there is not at least \p size bytes available 799 * in the buffer. 800 */ 801 int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf, 802 size_t absof, size_t size) { 803 const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL, 804 absof); 805 if (unlikely(!seg)) 806 return -1; 807 808 return rd_slice_init_seg(slice, rbuf, seg, 809 absof - seg->seg_absof, size); 810 } 811 812 /** 813 * @brief Initialize new slice covering the full buffer \p rbuf 814 */ 815 void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) { 816 int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf)); 817 rd_assert(r == 0); 818 } 819 820 821 822 /** 823 * @sa rd_slice_reader() rd_slice_peeker() 824 */ 825 size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) { 826 size_t rof = slice->rof; 827 size_t rlen; 828 const rd_segment_t *seg; 829 830 /* Find segment with non-zero payload */ 831 for (seg = slice->seg ; 832 seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ; 833 seg = TAILQ_NEXT(seg, seg_link)) 834 rof = 0; 835 836 if (unlikely(!seg || seg->seg_absof+rof >= slice->end)) 837 return 0; 838 839 *p = (const void *)(seg->seg_p + rof); 840 rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice)); 841 842 if (update_pos) { 843 if (slice->seg != seg) { 844 rd_assert(seg->seg_absof + rof >= slice->start && 845 seg->seg_absof + rof+rlen <= slice->end); 846 slice->seg = seg; 847 slice->rof = rlen; 848 } else { 849 slice->rof += rlen; 850 } 851 } 852 853 return rlen; 854 } 855 856 857 /** 858 * @brief Convenience reader iterator interface. 859 * 860 * Call repeatedly from while loop until it returns 0. 861 * 862 * @param slice slice to read from, position will be updated. 863 * @param p will be set to the start of \p *rlenp contiguous bytes of memory 864 * @param rlenp will be set to the number of bytes available in \p p 865 * 866 * @returns the number of bytes read, or 0 if slice is empty. 867 */ 868 size_t rd_slice_reader (rd_slice_t *slice, const void **p) { 869 return rd_slice_reader0(slice, p, 1/*update_pos*/); 870 } 871 872 /** 873 * @brief Identical to rd_slice_reader() but does NOT update the read position 874 */ 875 size_t rd_slice_peeker (const rd_slice_t *slice, const void **p) { 876 return rd_slice_reader0((rd_slice_t *)slice, p, 0/*dont update_pos*/); 877 } 878 879 880 881 882 883 /** 884 * @brief Read \p size bytes from current read position, 885 * advancing the read offset by the number of bytes copied to \p dst. 886 * 887 * If there are less than \p size remaining in the buffer 888 * then 0 is returned and no bytes are copied. 889 * 890 * @returns \p size, or 0 if \p size bytes are not available in buffer. 891 * 892 * @remark This performs a complete read, no partitial reads. 893 * 894 * @remark If \p dst is NULL only the read position is updated. 895 */ 896 size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) { 897 size_t remains = size; 898 char *d = (char *)dst; /* Possibly NULL */ 899 size_t rlen; 900 const void *p; 901 size_t orig_end = slice->end; 902 903 if (unlikely(rd_slice_remains(slice) < size)) 904 return 0; 905 906 /* Temporarily shrink slice to offset + \p size */ 907 slice->end = rd_slice_abs_offset(slice) + size; 908 909 while ((rlen = rd_slice_reader(slice, &p))) { 910 rd_dassert(remains >= rlen); 911 if (dst) { 912 memcpy(d, p, rlen); 913 d += rlen; 914 } 915 remains -= rlen; 916 } 917 918 rd_dassert(remains == 0); 919 920 /* Restore original size */ 921 slice->end = orig_end; 922 923 return size; 924 } 925 926 927 /** 928 * @brief Read \p size bytes from absolute slice offset \p offset 929 * and store in \p dst, without updating the slice read position. 930 * 931 * @returns \p size if the offset and size was within the slice, else 0. 932 */ 933 size_t rd_slice_peek (const rd_slice_t *slice, size_t offset, 934 void *dst, size_t size) { 935 rd_slice_t sub = *slice; 936 937 if (unlikely(rd_slice_seek(&sub, offset) == -1)) 938 return 0; 939 940 return rd_slice_read(&sub, dst, size); 941 942 } 943 944 945 /** 946 * @brief Read a varint-encoded unsigned integer from \p slice, 947 * storing the decoded number in \p nump on success (return value > 0). 948 * 949 * @returns the number of bytes read on success or 0 in case of 950 * buffer underflow. 951 */ 952 size_t rd_slice_read_uvarint (rd_slice_t *slice, uint64_t *nump) { 953 uint64_t num = 0; 954 int shift = 0; 955 size_t rof = slice->rof; 956 const rd_segment_t *seg; 957 958 /* Traverse segments, byte for byte, until varint is decoded 959 * or no more segments available (underflow). */ 960 for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) { 961 for ( ; rof < seg->seg_of ; rof++) { 962 unsigned char oct; 963 964 if (unlikely(seg->seg_absof+rof >= slice->end)) 965 return 0; /* Underflow */ 966 967 oct = *(const unsigned char *)(seg->seg_p + rof); 968 969 num |= (uint64_t)(oct & 0x7f) << shift; 970 shift += 7; 971 972 if (!(oct & 0x80)) { 973 /* Done: no more bytes expected */ 974 *nump = num; 975 976 /* Update slice's read pointer and offset */ 977 if (slice->seg != seg) 978 slice->seg = seg; 979 slice->rof = rof + 1; /* including the +1 byte 980 * that was just read */ 981 982 return shift / 7; 983 } 984 } 985 986 rof = 0; 987 } 988 989 return 0; /* Underflow */ 990 } 991 992 993 /** 994 * @returns a pointer to \p size contiguous bytes at the current read offset. 995 * If there isn't \p size contiguous bytes available NULL will 996 * be returned. 997 * 998 * @remark The read position is updated to point past \p size. 999 */ 1000 const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size) { 1001 void *p; 1002 1003 if (unlikely(rd_slice_remains(slice) < size || 1004 slice->rof + size > slice->seg->seg_of)) 1005 return NULL; 1006 1007 p = slice->seg->seg_p + slice->rof; 1008 1009 rd_slice_read(slice, NULL, size); 1010 1011 return p; 1012 } 1013 1014 1015 1016 /** 1017 * @brief Sets the slice's read position. The offset is the slice offset, 1018 * not buffer offset. 1019 * 1020 * @returns 0 if offset was within range, else -1 in which case the position 1021 * is not changed. 1022 */ 1023 int rd_slice_seek (rd_slice_t *slice, size_t offset) { 1024 const rd_segment_t *seg; 1025 size_t absof = slice->start + offset; 1026 1027 if (unlikely(absof >= slice->end)) 1028 return -1; 1029 1030 seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof); 1031 rd_assert(seg); 1032 1033 slice->seg = seg; 1034 slice->rof = absof - seg->seg_absof; 1035 rd_assert(seg->seg_absof + slice->rof >= slice->start && 1036 seg->seg_absof + slice->rof <= slice->end); 1037 1038 return 0; 1039 } 1040 1041 1042 /** 1043 * @brief Narrow the current slice to \p size, saving 1044 * the original slice state info \p save_slice. 1045 * 1046 * Use rd_slice_widen() to restore the saved slice 1047 * with the read count updated from the narrowed slice. 1048 * 1049 * This is useful for reading a sub-slice of a larger slice 1050 * without having to pass the lesser length around. 1051 * 1052 * @returns 1 if enough underlying slice buffer memory is available, else 0. 1053 */ 1054 int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size) { 1055 if (unlikely(slice->start + size > slice->end)) 1056 return 0; 1057 *save_slice = *slice; 1058 slice->end = slice->start + size; 1059 rd_assert(rd_slice_abs_offset(slice) <= slice->end); 1060 return 1; 1061 } 1062 1063 /** 1064 * @brief Same as rd_slice_narrow() but using a relative size \p relsize 1065 * from the current read position. 1066 */ 1067 int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice, 1068 size_t relsize) { 1069 return rd_slice_narrow(slice, save_slice, 1070 rd_slice_offset(slice) + relsize); 1071 } 1072 1073 1074 /** 1075 * @brief Restore the original \p save_slice size from a previous call to 1076 * rd_slice_narrow(), while keeping the updated read pointer from 1077 * \p slice. 1078 */ 1079 void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice) { 1080 slice->end = save_slice->end; 1081 } 1082 1083 1084 /** 1085 * @brief Copy the original slice \p orig to \p new_slice and adjust 1086 * the new slice length to \p size. 1087 * 1088 * This is a side-effect free form of rd_slice_narrow() which is not to 1089 * be used with rd_slice_widen(). 1090 * 1091 * @returns 1 if enough underlying slice buffer memory is available, else 0. 1092 */ 1093 int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice, 1094 size_t size) { 1095 if (unlikely(orig->start + size > orig->end)) 1096 return 0; 1097 *new_slice = *orig; 1098 new_slice->end = orig->start + size; 1099 rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end); 1100 return 1; 1101 } 1102 1103 /** 1104 * @brief Same as rd_slice_narrow_copy() but with a relative size from 1105 * the current read position. 1106 */ 1107 int rd_slice_narrow_copy_relative (const rd_slice_t *orig, 1108 rd_slice_t *new_slice, 1109 size_t relsize) { 1110 return rd_slice_narrow_copy(orig, new_slice, 1111 rd_slice_offset(orig) + relsize); 1112 } 1113 1114 1115 1116 1117 1118 /** 1119 * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable 1120 * segments from the slice's current read position. 1121 * 1122 * @param iovcntp will be set to the number of populated \p iovs[] 1123 * @param size_max limits the total number of bytes made available. 1124 * Note: this value may be overshot with the size of one 1125 * segment. 1126 * 1127 * @returns the total number of bytes in the represented segments. 1128 * 1129 * @remark will NOT update the read position. 1130 */ 1131 size_t rd_slice_get_iov (const rd_slice_t *slice, 1132 struct iovec *iovs, size_t *iovcntp, 1133 size_t iov_max, size_t size_max) { 1134 const void *p; 1135 size_t rlen; 1136 size_t iovcnt = 0; 1137 size_t sum = 0; 1138 rd_slice_t copy = *slice; /* Use a copy of the slice so we dont 1139 * update the position for the caller. */ 1140 1141 while (sum < size_max && iovcnt < iov_max && 1142 (rlen = rd_slice_reader(©, &p))) { 1143 iovs[iovcnt].iov_base = (void *)p; 1144 iovs[iovcnt++].iov_len = rlen; 1145 1146 sum += rlen; 1147 } 1148 1149 *iovcntp = iovcnt; 1150 1151 return sum; 1152 } 1153 1154 1155 1156 1157 1158 /** 1159 * @brief CRC32 calculation of slice. 1160 * 1161 * @returns the calculated CRC 1162 * 1163 * @remark the slice's position is updated. 1164 */ 1165 uint32_t rd_slice_crc32 (rd_slice_t *slice) { 1166 rd_crc32_t crc; 1167 const void *p; 1168 size_t rlen; 1169 1170 crc = rd_crc32_init(); 1171 1172 while ((rlen = rd_slice_reader(slice, &p))) 1173 crc = rd_crc32_update(crc, p, rlen); 1174 1175 return (uint32_t)rd_crc32_finalize(crc); 1176 } 1177 1178 /** 1179 * @brief Compute CRC-32C of segments starting at at buffer position \p absof, 1180 * also supporting the case where the position/offset is not at the 1181 * start of the first segment. 1182 * 1183 * @remark the slice's position is updated. 1184 */ 1185 uint32_t rd_slice_crc32c (rd_slice_t *slice) { 1186 const void *p; 1187 size_t rlen; 1188 uint32_t crc = 0; 1189 1190 while ((rlen = rd_slice_reader(slice, &p))) 1191 crc = crc32c(crc, (const char *)p, rlen); 1192 1193 return crc; 1194 } 1195 1196 1197 1198 1199 1200 /** 1201 * @name Debugging dumpers 1202 * 1203 * 1204 */ 1205 1206 static void rd_segment_dump (const rd_segment_t *seg, const char *ind, 1207 size_t relof, int do_hexdump) { 1208 fprintf(stderr, 1209 "%s((rd_segment_t *)%p): " 1210 "p %p, of %"PRIusz", " 1211 "absof %"PRIusz", size %"PRIusz", free %p, flags 0x%x\n", 1212 ind, seg, seg->seg_p, seg->seg_of, 1213 seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags); 1214 rd_assert(relof <= seg->seg_of); 1215 if (do_hexdump) 1216 rd_hexdump(stderr, "segment", 1217 seg->seg_p+relof, seg->seg_of-relof); 1218 } 1219 1220 void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump) { 1221 const rd_segment_t *seg; 1222 1223 fprintf(stderr, 1224 "((rd_buf_t *)%p):\n" 1225 " len %"PRIusz" size %"PRIusz 1226 ", %"PRIusz"/%"PRIusz" extra memory used\n", 1227 rbuf, rbuf->rbuf_len, rbuf->rbuf_size, 1228 rbuf->rbuf_extra_len, rbuf->rbuf_extra_size); 1229 1230 if (rbuf->rbuf_wpos) { 1231 fprintf(stderr, " wpos:\n"); 1232 rd_segment_dump(rbuf->rbuf_wpos, " ", 0, 0); 1233 } 1234 1235 if (rbuf->rbuf_segment_cnt > 0) { 1236 size_t segcnt = 0; 1237 1238 fprintf(stderr, " %"PRIusz" linked segments:\n", 1239 rbuf->rbuf_segment_cnt); 1240 TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) { 1241 rd_segment_dump(seg, " ", 0, do_hexdump); 1242 segcnt++; 1243 rd_assert(segcnt <= rbuf->rbuf_segment_cnt); 1244 } 1245 } 1246 } 1247 1248 void rd_slice_dump (const rd_slice_t *slice, int do_hexdump) { 1249 const rd_segment_t *seg; 1250 size_t relof; 1251 1252 fprintf(stderr, 1253 "((rd_slice_t *)%p):\n" 1254 " buf %p (len %"PRIusz"), seg %p (absof %"PRIusz"), " 1255 "rof %"PRIusz", start %"PRIusz", end %"PRIusz", size %"PRIusz 1256 ", offset %"PRIusz"\n", 1257 slice, slice->buf, rd_buf_len(slice->buf), 1258 slice->seg, slice->seg ? slice->seg->seg_absof : 0, 1259 slice->rof, slice->start, slice->end, 1260 rd_slice_size(slice), rd_slice_offset(slice)); 1261 relof = slice->rof; 1262 1263 for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) { 1264 rd_segment_dump(seg, " ", relof, do_hexdump); 1265 relof = 0; 1266 } 1267 } 1268 1269 1270 /** 1271 * @name Unit-tests 1272 * 1273 * 1274 * 1275 */ 1276 1277 1278 /** 1279 * @brief Basic write+read test 1280 */ 1281 static int do_unittest_write_read (void) { 1282 rd_buf_t b; 1283 char ones[1024]; 1284 char twos[1024]; 1285 char threes[1024]; 1286 char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ 1287 char buf[1024*3]; 1288 rd_slice_t slice; 1289 size_t r, pos; 1290 1291 memset(ones, 0x1, sizeof(ones)); 1292 memset(twos, 0x2, sizeof(twos)); 1293 memset(threes, 0x3, sizeof(threes)); 1294 memset(fiftyfives, 0x55, sizeof(fiftyfives)); 1295 memset(buf, 0x55, sizeof(buf)); 1296 1297 rd_buf_init(&b, 2, 1000); 1298 1299 /* 1300 * Verify write 1301 */ 1302 r = rd_buf_write(&b, ones, 200); 1303 RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r); 1304 pos = rd_buf_write_pos(&b); 1305 RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos); 1306 1307 r = rd_buf_write(&b, twos, 800); 1308 RD_UT_ASSERT(r == 200, "write() returned position %"PRIusz, r); 1309 pos = rd_buf_write_pos(&b); 1310 RD_UT_ASSERT(pos == 200+800, "pos() returned position %"PRIusz, pos); 1311 1312 /* Buffer grows here */ 1313 r = rd_buf_write(&b, threes, 1); 1314 RD_UT_ASSERT(pos == 200+800, 1315 "write() returned position %"PRIusz, r); 1316 pos = rd_buf_write_pos(&b); 1317 RD_UT_ASSERT(pos == 200+800+1, "pos() returned position %"PRIusz, pos); 1318 1319 /* 1320 * Verify read 1321 */ 1322 /* Get full slice. */ 1323 rd_slice_init_full(&slice, &b); 1324 1325 r = rd_slice_read(&slice, buf, 200+800+2); 1326 RD_UT_ASSERT(r == 0, 1327 "read() > remaining should have failed, gave %"PRIusz, r); 1328 r = rd_slice_read(&slice, buf, 200+800+1); 1329 RD_UT_ASSERT(r == 200+800+1, 1330 "read() returned %"PRIusz" (%"PRIusz" remains)", 1331 r, rd_slice_remains(&slice)); 1332 1333 RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones"); 1334 RD_UT_ASSERT(!memcmp(buf+200, twos, 800), "verify twos"); 1335 RD_UT_ASSERT(!memcmp(buf+200+800, threes, 1), "verify threes"); 1336 RD_UT_ASSERT(!memcmp(buf+200+800+1, fiftyfives, 100), "verify 55s"); 1337 1338 rd_buf_destroy(&b); 1339 1340 RD_UT_PASS(); 1341 } 1342 1343 1344 /** 1345 * @brief Helper read verifier, not a unit-test itself. 1346 */ 1347 #define do_unittest_read_verify(b,absof,len,verify) do { \ 1348 int __fail = do_unittest_read_verify0(b,absof,len,verify); \ 1349 RD_UT_ASSERT(!__fail, \ 1350 "read_verify(absof=%"PRIusz",len=%"PRIusz") " \ 1351 "failed", (size_t)absof, (size_t)len); \ 1352 } while (0) 1353 1354 static int 1355 do_unittest_read_verify0 (const rd_buf_t *b, size_t absof, size_t len, 1356 const char *verify) { 1357 rd_slice_t slice, sub; 1358 char buf[1024]; 1359 size_t half; 1360 size_t r; 1361 int i; 1362 1363 rd_assert(sizeof(buf) >= len); 1364 1365 /* Get reader slice */ 1366 i = rd_slice_init(&slice, b, absof, len); 1367 RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i); 1368 1369 r = rd_slice_read(&slice, buf, len); 1370 RD_UT_ASSERT(r == len, 1371 "read() returned %"PRIusz" expected %"PRIusz 1372 " (%"PRIusz" remains)", 1373 r, len, rd_slice_remains(&slice)); 1374 1375 RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); 1376 1377 r = rd_slice_offset(&slice); 1378 RD_UT_ASSERT(r == len, "offset() returned %"PRIusz", not %"PRIusz, 1379 r, len); 1380 1381 half = len / 2; 1382 i = rd_slice_seek(&slice, half); 1383 RD_UT_ASSERT(i == 0, "seek(%"PRIusz") returned %d", half, i); 1384 r = rd_slice_offset(&slice); 1385 RD_UT_ASSERT(r == half, "offset() returned %"PRIusz", not %"PRIusz, 1386 r, half); 1387 1388 /* Get a sub-slice covering the later half. */ 1389 sub = rd_slice_pos(&slice); 1390 r = rd_slice_offset(&sub); 1391 RD_UT_ASSERT(r == 0, "sub: offset() returned %"PRIusz", not %"PRIusz, 1392 r, (size_t)0); 1393 r = rd_slice_size(&sub); 1394 RD_UT_ASSERT(r == half, "sub: size() returned %"PRIusz", not %"PRIusz, 1395 r, half); 1396 r = rd_slice_remains(&sub); 1397 RD_UT_ASSERT(r == half, 1398 "sub: remains() returned %"PRIusz", not %"PRIusz, 1399 r, half); 1400 1401 /* Read half */ 1402 r = rd_slice_read(&sub, buf, half); 1403 RD_UT_ASSERT(r == half, 1404 "sub read() returned %"PRIusz" expected %"PRIusz 1405 " (%"PRIusz" remains)", 1406 r, len, rd_slice_remains(&sub)); 1407 1408 RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); 1409 1410 r = rd_slice_offset(&sub); 1411 RD_UT_ASSERT(r == rd_slice_size(&sub), 1412 "sub offset() returned %"PRIusz", not %"PRIusz, 1413 r, rd_slice_size(&sub)); 1414 r = rd_slice_remains(&sub); 1415 RD_UT_ASSERT(r == 0, 1416 "sub: remains() returned %"PRIusz", not %"PRIusz, 1417 r, (size_t)0); 1418 1419 return 0; 1420 } 1421 1422 1423 /** 1424 * @brief write_seek() and split() test 1425 */ 1426 static int do_unittest_write_split_seek (void) { 1427 rd_buf_t b; 1428 char ones[1024]; 1429 char twos[1024]; 1430 char threes[1024]; 1431 char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ 1432 char buf[1024*3]; 1433 size_t r, pos; 1434 rd_segment_t *seg, *newseg; 1435 1436 memset(ones, 0x1, sizeof(ones)); 1437 memset(twos, 0x2, sizeof(twos)); 1438 memset(threes, 0x3, sizeof(threes)); 1439 memset(fiftyfives, 0x55, sizeof(fiftyfives)); 1440 memset(buf, 0x55, sizeof(buf)); 1441 1442 rd_buf_init(&b, 0, 0); 1443 1444 /* 1445 * Verify write 1446 */ 1447 r = rd_buf_write(&b, ones, 400); 1448 RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r); 1449 pos = rd_buf_write_pos(&b); 1450 RD_UT_ASSERT(pos == 400, "pos() returned position %"PRIusz, pos); 1451 1452 do_unittest_read_verify(&b, 0, 400, ones); 1453 1454 /* 1455 * Seek and re-write 1456 */ 1457 r = rd_buf_write_seek(&b, 200); 1458 RD_UT_ASSERT(r == 0, "seek() failed"); 1459 pos = rd_buf_write_pos(&b); 1460 RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos); 1461 1462 r = rd_buf_write(&b, twos, 100); 1463 RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r); 1464 pos = rd_buf_write_pos(&b); 1465 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos); 1466 1467 do_unittest_read_verify(&b, 0, 200, ones); 1468 do_unittest_read_verify(&b, 200, 100, twos); 1469 1470 /* Make sure read() did not modify the write position. */ 1471 pos = rd_buf_write_pos(&b); 1472 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos); 1473 1474 /* Split buffer, write position is now at split where writes 1475 * are not allowed (mid buffer). */ 1476 seg = rd_buf_get_segment_at_offset(&b, NULL, 50); 1477 RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment"); 1478 newseg = rd_segment_split(&b, seg, 50); 1479 rd_buf_append_segment(&b, newseg); 1480 seg = rd_buf_get_segment_at_offset(&b, NULL, 50); 1481 RD_UT_ASSERT(seg != NULL, "seg"); 1482 RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg); 1483 RD_UT_ASSERT(seg->seg_of > 0, 1484 "assumed beginning of segment, got %"PRIusz, seg->seg_of); 1485 1486 pos = rd_buf_write_pos(&b); 1487 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos); 1488 1489 /* Re-verify that nothing changed */ 1490 do_unittest_read_verify(&b, 0, 200, ones); 1491 do_unittest_read_verify(&b, 200, 100, twos); 1492 1493 /* Do a write seek at buffer boundary, sub-sequent buffers should 1494 * be destroyed. */ 1495 r = rd_buf_write_seek(&b, 50); 1496 RD_UT_ASSERT(r == 0, "seek() failed"); 1497 do_unittest_read_verify(&b, 0, 50, ones); 1498 1499 rd_buf_destroy(&b); 1500 1501 RD_UT_PASS(); 1502 } 1503 1504 /** 1505 * @brief Unittest to verify payload is correctly written and read. 1506 * Each written u32 word is the running CRC of the word count. 1507 */ 1508 static int do_unittest_write_read_payload_correctness (void) { 1509 uint32_t crc; 1510 uint32_t write_crc, read_crc; 1511 const int seed = 12345; 1512 rd_buf_t b; 1513 const size_t max_cnt = 20000; 1514 rd_slice_t slice; 1515 size_t r; 1516 size_t i; 1517 int pass; 1518 1519 crc = rd_crc32_init(); 1520 crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); 1521 1522 rd_buf_init(&b, 0, 0); 1523 for (i = 0 ; i < max_cnt ; i++) { 1524 crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); 1525 rd_buf_write(&b, &crc, sizeof(crc)); 1526 } 1527 1528 write_crc = rd_crc32_finalize(crc); 1529 1530 r = rd_buf_len(&b); 1531 RD_UT_ASSERT(r == max_cnt * sizeof(crc), 1532 "expected length %"PRIusz", not %"PRIusz, 1533 r, max_cnt * sizeof(crc)); 1534 1535 /* 1536 * Now verify the contents with a reader. 1537 */ 1538 rd_slice_init_full(&slice, &b); 1539 1540 r = rd_slice_remains(&slice); 1541 RD_UT_ASSERT(r == rd_buf_len(&b), 1542 "slice remains %"PRIusz", should be %"PRIusz, 1543 r, rd_buf_len(&b)); 1544 1545 for (pass = 0 ; pass < 2 ; pass++) { 1546 /* Two passes: 1547 * - pass 1: using peek() 1548 * - pass 2: using read() 1549 */ 1550 const char *pass_str = pass == 0 ? "peek":"read"; 1551 1552 crc = rd_crc32_init(); 1553 crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); 1554 1555 for (i = 0 ; i < max_cnt ; i++) { 1556 uint32_t buf_crc; 1557 1558 crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); 1559 1560 if (pass == 0) 1561 r = rd_slice_peek(&slice, i * sizeof(buf_crc), 1562 &buf_crc, sizeof(buf_crc)); 1563 else 1564 r = rd_slice_read(&slice, &buf_crc, 1565 sizeof(buf_crc)); 1566 RD_UT_ASSERT(r == sizeof(buf_crc), 1567 "%s() at #%"PRIusz" failed: " 1568 "r is %"PRIusz" not %"PRIusz, 1569 pass_str, i, r, sizeof(buf_crc)); 1570 RD_UT_ASSERT(buf_crc == crc, 1571 "%s: invalid crc at #%"PRIusz 1572 ": expected %"PRIu32", read %"PRIu32, 1573 pass_str, i, crc, buf_crc); 1574 } 1575 1576 read_crc = rd_crc32_finalize(crc); 1577 1578 RD_UT_ASSERT(read_crc == write_crc, 1579 "%s: finalized read crc %"PRIu32 1580 " != write crc %"PRIu32, 1581 pass_str, read_crc, write_crc); 1582 1583 } 1584 1585 r = rd_slice_remains(&slice); 1586 RD_UT_ASSERT(r == 0, 1587 "slice remains %"PRIusz", should be %"PRIusz, 1588 r, (size_t)0); 1589 1590 rd_buf_destroy(&b); 1591 1592 RD_UT_PASS(); 1593 } 1594 1595 #define do_unittest_iov_verify(...) do { \ 1596 int __fail = do_unittest_iov_verify0(__VA_ARGS__); \ 1597 RD_UT_ASSERT(!__fail, "iov_verify() failed"); \ 1598 } while (0) 1599 static int do_unittest_iov_verify0 (rd_buf_t *b, 1600 size_t exp_iovcnt, size_t exp_totsize) { 1601 #define MY_IOV_MAX 16 1602 struct iovec iov[MY_IOV_MAX]; 1603 size_t iovcnt; 1604 size_t i; 1605 size_t totsize, sum; 1606 1607 rd_assert(exp_iovcnt <= MY_IOV_MAX); 1608 1609 totsize = rd_buf_get_write_iov(b, iov, &iovcnt, 1610 MY_IOV_MAX, exp_totsize); 1611 RD_UT_ASSERT(totsize >= exp_totsize, 1612 "iov total size %"PRIusz" expected >= %"PRIusz, 1613 totsize, exp_totsize); 1614 RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX, 1615 "iovcnt %"PRIusz 1616 ", expected %"PRIusz" < x <= MY_IOV_MAX", 1617 iovcnt, exp_iovcnt); 1618 1619 sum = 0; 1620 for (i = 0 ; i < iovcnt ; i++) { 1621 RD_UT_ASSERT(iov[i].iov_base, 1622 "iov #%"PRIusz" iov_base not set", i); 1623 RD_UT_ASSERT(iov[i].iov_len, 1624 "iov #%"PRIusz" iov_len %"PRIusz" out of range", 1625 i, iov[i].iov_len); 1626 sum += iov[i].iov_len; 1627 RD_UT_ASSERT(sum <= totsize, "sum %"PRIusz" > totsize %"PRIusz, 1628 sum, totsize); 1629 } 1630 1631 RD_UT_ASSERT(sum == totsize, 1632 "sum %"PRIusz" != totsize %"PRIusz, 1633 sum, totsize); 1634 1635 return 0; 1636 } 1637 1638 1639 /** 1640 * @brief Verify that buffer to iovec conversion works. 1641 */ 1642 static int do_unittest_write_iov (void) { 1643 rd_buf_t b; 1644 1645 rd_buf_init(&b, 0, 0); 1646 rd_buf_write_ensure(&b, 100, 100); 1647 1648 do_unittest_iov_verify(&b, 1, 100); 1649 1650 /* Add a secondary buffer */ 1651 rd_buf_write_ensure(&b, 30000, 0); 1652 1653 do_unittest_iov_verify(&b, 2, 100+30000); 1654 1655 1656 rd_buf_destroy(&b); 1657 1658 RD_UT_PASS(); 1659 } 1660 1661 /** 1662 * @brief Verify that erasing parts of the buffer works. 1663 */ 1664 static int do_unittest_erase (void) { 1665 static const struct { 1666 const char *segs[4]; 1667 const char *writes[4]; 1668 struct { 1669 size_t of; 1670 size_t size; 1671 size_t retsize; 1672 } erasures[4]; 1673 1674 const char *expect; 1675 } in[] = { 1676 /* 12|3|45 1677 * x x xx */ 1678 { .segs = { "12", "3", "45" }, 1679 .erasures = { { 1, 4, 4 } }, 1680 .expect = "1", 1681 }, 1682 /* 12|3|45 1683 * xx */ 1684 { .segs = { "12", "3", "45" }, 1685 .erasures = { { 0, 2, 2 } }, 1686 .expect = "345", 1687 }, 1688 /* 12|3|45 1689 * xx */ 1690 { .segs = { "12", "3", "45" }, 1691 .erasures = { { 3, 2, 2 } }, 1692 .expect = "123", 1693 }, 1694 /* 12|3|45 1695 * x 1696 * 1 |3|45 1697 * x 1698 * 1 | 45 1699 * x */ 1700 { .segs = { "12", "3", "45" }, 1701 .erasures = { { 1, 1, 1 }, 1702 { 1, 1, 1 }, 1703 { 2, 1, 1 } }, 1704 .expect = "14", 1705 }, 1706 /* 12|3|45 1707 * xxxxxxx */ 1708 { .segs = { "12", "3", "45" }, 1709 .erasures = { { 0, 5, 5 } }, 1710 .expect = "", 1711 }, 1712 /* 12|3|45 1713 * x */ 1714 { .segs = { "12", "3", "45" }, 1715 .erasures = { { 0, 1, 1 } }, 1716 .expect = "2345", 1717 }, 1718 /* 12|3|45 1719 * x */ 1720 { .segs = { "12", "3", "45" }, 1721 .erasures = { { 4, 1, 1 } }, 1722 .expect = "1234", 1723 }, 1724 /* 12|3|45 1725 * x */ 1726 { .segs = { "12", "3", "45" }, 1727 .erasures = { { 5, 10, 0 } }, 1728 .expect = "12345", 1729 }, 1730 /* 12|3|45 1731 * xxx */ 1732 { .segs = { "12", "3", "45" }, 1733 .erasures = { { 4, 3, 1 }, { 4, 3, 0 }, { 4, 3, 0 } }, 1734 .expect = "1234", 1735 }, 1736 /* 1 1737 * xxx */ 1738 { .segs = { "1" }, 1739 .erasures = { { 0, 3, 1 } }, 1740 .expect = "", 1741 }, 1742 /* 123456 1743 * xxxxxx */ 1744 { .segs = { "123456" }, 1745 .erasures = { { 0, 6, 6 } }, 1746 .expect = "", 1747 }, 1748 /* 123456789a 1749 * xxx */ 1750 { .segs = { "123456789a" }, 1751 .erasures = { { 4, 3, 3 } }, 1752 .expect = "123489a", 1753 }, 1754 /* 1234|5678 1755 * x xx */ 1756 { .segs = { "1234", "5678" }, 1757 .erasures = { { 3, 3, 3 } }, 1758 .writes = { "9abc" }, 1759 .expect = "123789abc" 1760 }, 1761 1762 { .expect = NULL } 1763 }; 1764 int i; 1765 1766 for (i = 0 ; in[i].expect ; i++) { 1767 rd_buf_t b; 1768 rd_slice_t s; 1769 size_t expsz = strlen(in[i].expect); 1770 char *out; 1771 int j; 1772 size_t r; 1773 int r2; 1774 1775 rd_buf_init(&b, 0, 0); 1776 1777 /* Write segments to buffer */ 1778 for (j = 0 ; in[i].segs[j] ; j++) 1779 rd_buf_push_writable(&b, rd_strdup(in[i].segs[j]), 1780 strlen(in[i].segs[j]), rd_free); 1781 1782 /* Perform erasures */ 1783 for (j = 0 ; in[i].erasures[j].retsize ; j++) { 1784 r = rd_buf_erase(&b, 1785 in[i].erasures[j].of, 1786 in[i].erasures[j].size); 1787 RD_UT_ASSERT(r == in[i].erasures[j].retsize, 1788 "expected retsize %"PRIusz" for i=%d,j=%d" 1789 ", not %"PRIusz, 1790 in[i].erasures[j].retsize, i, j, r); 1791 } 1792 1793 /* Perform writes */ 1794 for (j = 0 ; in[i].writes[j] ; j++) 1795 rd_buf_write(&b, in[i].writes[j], 1796 strlen(in[i].writes[j])); 1797 1798 RD_UT_ASSERT(expsz == rd_buf_len(&b), 1799 "expected buffer to be %"PRIusz" bytes, not " 1800 "%"PRIusz" for i=%d", 1801 expsz, rd_buf_len(&b), i); 1802 1803 /* Read back and verify */ 1804 r2 = rd_slice_init(&s, &b, 0, rd_buf_len(&b)); 1805 RD_UT_ASSERT((r2 == -1 && rd_buf_len(&b) == 0) || 1806 (r2 == 0 && rd_buf_len(&b) > 0), 1807 "slice_init(%"PRIusz") returned %d for i=%d", 1808 rd_buf_len(&b), r2, i); 1809 if (r2 == -1) 1810 continue; /* Empty buffer */ 1811 1812 RD_UT_ASSERT(expsz == rd_slice_size(&s), 1813 "expected slice to be %"PRIusz" bytes, not %"PRIusz 1814 " for i=%d", 1815 expsz, rd_slice_size(&s), i); 1816 1817 out = rd_malloc(expsz); 1818 1819 r = rd_slice_read(&s, out, expsz); 1820 RD_UT_ASSERT(r == expsz, 1821 "expected to read %"PRIusz" bytes, not %"PRIusz 1822 " for i=%d", 1823 expsz, r, i); 1824 1825 RD_UT_ASSERT(!memcmp(out, in[i].expect, expsz), 1826 "Expected \"%.*s\", not \"%.*s\" for i=%d", 1827 (int)expsz, in[i].expect, 1828 (int)r, out, i); 1829 1830 rd_free(out); 1831 1832 RD_UT_ASSERT(rd_slice_remains(&s) == 0, 1833 "expected no remaining bytes in slice, but got " 1834 "%"PRIusz" for i=%d", 1835 rd_slice_remains(&s), i); 1836 1837 rd_buf_destroy(&b); 1838 } 1839 1840 1841 RD_UT_PASS(); 1842 } 1843 1844 1845 int unittest_rdbuf (void) { 1846 int fails = 0; 1847 1848 fails += do_unittest_write_read(); 1849 fails += do_unittest_write_split_seek(); 1850 fails += do_unittest_write_read_payload_correctness(); 1851 fails += do_unittest_write_iov(); 1852 fails += do_unittest_erase(); 1853 1854 return fails; 1855 } 1856