1 /* $OpenBSD: abuf.c,v 1.22 2010/06/04 06:15:28 ratchov Exp $ */ 2 /* 3 * Copyright (c) 2008 Alexandre Ratchov <alex@caoua.org> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 /* 18 * Simple byte fifo. It has one reader and one writer. The abuf 19 * structure is used to interconnect audio processing units (aproc 20 * structures). 21 * 22 * The abuf data is split in two parts: (1) valid data available to the reader 23 * (2) space available to the writer, which is not necessarily unused. It works 24 * as follows: the write starts filling at offset (start + used), once the data 25 * is ready, the writer adds to used the count of bytes available. 26 */ 27 /* 28 * TODO 29 * 30 * use blocks instead of frames for WOK and ROK macros. If necessary 31 * (unlikely) define reader block size and writer blocks size to 32 * ease pipe/socket implementation 33 */ 34 #include <err.h> 35 #include <stdarg.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 40 #include "abuf.h" 41 #include "aparams.h" 42 #include "aproc.h" 43 #include "conf.h" 44 #ifdef DEBUG 45 #include "dbg.h" 46 #endif 47 48 #ifdef DEBUG 49 void 50 abuf_dbg(struct abuf *buf) 51 { 52 if (buf->wproc) { 53 aproc_dbg(buf->wproc); 54 } else { 55 dbg_puts("none"); 56 } 57 dbg_puts(buf->inuse ? "=>" : "->"); 58 if (buf->rproc) { 59 aproc_dbg(buf->rproc); 60 } else { 61 dbg_puts("none"); 62 } 63 } 64 65 void 66 abuf_dump(struct abuf *buf) 67 { 68 abuf_dbg(buf); 69 dbg_puts(": used = "); 70 dbg_putu(buf->used); 71 dbg_puts("/"); 72 dbg_putu(buf->len); 73 dbg_puts(" start = "); 74 dbg_putu(buf->start); 75 dbg_puts("\n"); 76 } 77 #endif 78 79 struct abuf * 80 abuf_new(unsigned nfr, struct aparams *par) 81 { 82 struct abuf *buf; 83 unsigned len, bpf; 84 85 bpf = aparams_bpf(par); 86 len = nfr * bpf; 87 buf = malloc(sizeof(struct abuf) + len); 88 if (buf == NULL) { 89 #ifdef DEBUG 90 dbg_puts("couldn't allocate abuf of "); 91 dbg_putu(nfr); 92 dbg_puts("fr * "); 93 dbg_putu(bpf); 94 dbg_puts("bpf\n"); 95 dbg_panic(); 96 #else 97 err(1, "malloc"); 98 #endif 99 } 100 buf->bpf = bpf; 101 buf->cmin = par->cmin; 102 buf->cmax = par->cmax; 103 buf->inuse = 0; 104 105 /* 106 * fill fifo pointers 107 */ 108 buf->len = nfr; 109 buf->used = 0; 110 buf->start = 0; 111 buf->rproc = NULL; 112 buf->wproc = NULL; 113 buf->duplex = NULL; 114 return buf; 115 } 116 117 void 118 abuf_del(struct abuf *buf) 119 { 120 if (buf->duplex) 121 buf->duplex->duplex = NULL; 122 #ifdef DEBUG 123 if (buf->rproc || buf->wproc) { 124 abuf_dbg(buf); 125 dbg_puts(": can't delete referenced buffer\n"); 126 dbg_panic(); 127 } 128 if (ABUF_ROK(buf)) { 129 /* 130 * XXX : we should call abort(), here. 131 * However, poll() doesn't seem to return POLLHUP, 132 * so the reader is never destroyed; instead it appears 133 * as blocked. Fix file_poll(), if fixable, and add 134 * a call to abord() here. 135 */ 136 if (debug_level >= 3) { 137 abuf_dbg(buf); 138 dbg_puts(": deleting non-empty buffer, used = "); 139 dbg_putu(buf->used); 140 dbg_puts("\n"); 141 } 142 } 143 #endif 144 free(buf); 145 } 146 147 /* 148 * Clear buffer contents. 149 */ 150 void 151 abuf_clear(struct abuf *buf) 152 { 153 #ifdef DEBUG 154 if (debug_level >= 3) { 155 abuf_dbg(buf); 156 dbg_puts(": cleared\n"); 157 } 158 #endif 159 buf->used = 0; 160 buf->start = 0; 161 } 162 163 /* 164 * Get a pointer to the readable block at the given offset. 165 */ 166 unsigned char * 167 abuf_rgetblk(struct abuf *buf, unsigned *rsize, unsigned ofs) 168 { 169 unsigned count, start, used; 170 171 start = buf->start + ofs; 172 used = buf->used - ofs; 173 if (start >= buf->len) 174 start -= buf->len; 175 #ifdef DEBUG 176 if (start >= buf->len || used > buf->used) { 177 abuf_dump(buf); 178 dbg_puts(": rgetblk: bad ofs = "); 179 dbg_putu(ofs); 180 dbg_puts("\n"); 181 dbg_panic(); 182 } 183 #endif 184 count = buf->len - start; 185 if (count > used) 186 count = used; 187 *rsize = count; 188 return (unsigned char *)buf + sizeof(struct abuf) + start * buf->bpf; 189 } 190 191 /* 192 * Discard the block at the start postion. 193 */ 194 void 195 abuf_rdiscard(struct abuf *buf, unsigned count) 196 { 197 #ifdef DEBUG 198 if (count > buf->used) { 199 abuf_dump(buf); 200 dbg_puts(": rdiscard: bad count = "); 201 dbg_putu(count); 202 dbg_puts("\n"); 203 dbg_panic(); 204 } 205 if (debug_level >= 4) { 206 abuf_dbg(buf); 207 dbg_puts(": discard("); 208 dbg_putu(count); 209 dbg_puts(")\n"); 210 } 211 #endif 212 buf->used -= count; 213 buf->start += count; 214 if (buf->start >= buf->len) 215 buf->start -= buf->len; 216 } 217 218 /* 219 * Commit the data written at the end postion. 220 */ 221 void 222 abuf_wcommit(struct abuf *buf, unsigned count) 223 { 224 #ifdef DEBUG 225 if (count > (buf->len - buf->used)) { 226 abuf_dump(buf); 227 dbg_puts(": rdiscard: bad count = "); 228 dbg_putu(count); 229 dbg_puts("\n"); 230 dbg_panic(); 231 } 232 if (debug_level >= 4) { 233 abuf_dbg(buf); 234 dbg_puts(": commit("); 235 dbg_putu(count); 236 dbg_puts(")\n"); 237 } 238 #endif 239 buf->used += count; 240 } 241 242 /* 243 * Get a pointer to the writable block at offset ofs. 244 */ 245 unsigned char * 246 abuf_wgetblk(struct abuf *buf, unsigned *rsize, unsigned ofs) 247 { 248 unsigned end, avail, count; 249 250 251 end = buf->start + buf->used + ofs; 252 if (end >= buf->len) 253 end -= buf->len; 254 #ifdef DEBUG 255 if (end >= buf->len) { 256 abuf_dump(buf); 257 dbg_puts(": wgetblk: bad ofs = "); 258 dbg_putu(ofs); 259 dbg_puts("\n"); 260 dbg_panic(); 261 } 262 #endif 263 avail = buf->len - (buf->used + ofs); 264 count = buf->len - end; 265 if (count > avail) 266 count = avail; 267 *rsize = count; 268 return (unsigned char *)buf + sizeof(struct abuf) + end * buf->bpf; 269 } 270 271 /* 272 * Flush buffer either by dropping samples or by calling the aproc 273 * call-back to consume data. Return 0 if blocked, 1 otherwise. 274 */ 275 int 276 abuf_flush_do(struct abuf *buf) 277 { 278 struct aproc *p; 279 280 p = buf->rproc; 281 if (!p) 282 return 0; 283 #ifdef DEBUG 284 if (debug_level >= 4) { 285 aproc_dbg(p); 286 dbg_puts(": in\n"); 287 } 288 #endif 289 return p->ops->in(p, buf); 290 } 291 292 /* 293 * Fill the buffer either by generating silence or by calling the aproc 294 * call-back to provide data. Return 0 if blocked, 1 otherwise. 295 */ 296 int 297 abuf_fill_do(struct abuf *buf) 298 { 299 struct aproc *p; 300 301 p = buf->wproc; 302 if (!p) 303 return 0; 304 #ifdef DEBUG 305 if (debug_level >= 4) { 306 aproc_dbg(p); 307 dbg_puts(": out\n"); 308 } 309 #endif 310 return p->ops->out(p, buf); 311 } 312 313 /* 314 * Notify the reader that there will be no more input (producer 315 * disappeared) and destroy the buffer. 316 */ 317 void 318 abuf_eof_do(struct abuf *buf) 319 { 320 struct aproc *p; 321 322 p = buf->rproc; 323 if (p) { 324 buf->rproc = NULL; 325 LIST_REMOVE(buf, ient); 326 buf->inuse++; 327 #ifdef DEBUG 328 if (debug_level >= 4) { 329 aproc_dbg(p); 330 dbg_puts(": eof\n"); 331 } 332 #endif 333 p->ops->eof(p, buf); 334 buf->inuse--; 335 } 336 abuf_del(buf); 337 } 338 339 /* 340 * Notify the writer that the buffer has no more consumer, 341 * and destroy the buffer. 342 */ 343 void 344 abuf_hup_do(struct abuf *buf) 345 { 346 struct aproc *p; 347 348 if (ABUF_ROK(buf)) { 349 #ifdef DEBUG 350 if (debug_level >= 3) { 351 abuf_dbg(buf); 352 dbg_puts(": hup: lost "); 353 dbg_putu(buf->used); 354 dbg_puts(" bytes\n"); 355 } 356 #endif 357 buf->used = 0; 358 } 359 p = buf->wproc; 360 if (p != NULL) { 361 buf->wproc = NULL; 362 LIST_REMOVE(buf, oent); 363 buf->inuse++; 364 #ifdef DEBUG 365 if (debug_level >= 3) { 366 aproc_dbg(p); 367 dbg_puts(": hup\n"); 368 } 369 #endif 370 p->ops->hup(p, buf); 371 buf->inuse--; 372 } 373 abuf_del(buf); 374 } 375 376 /* 377 * Notify the read end of the buffer that there is input available 378 * and that data can be processed again. 379 */ 380 int 381 abuf_flush(struct abuf *buf) 382 { 383 if (buf->inuse) { 384 #ifdef DEBUG 385 if (debug_level >= 4) { 386 abuf_dbg(buf); 387 dbg_puts(": flush blocked (inuse)\n"); 388 } 389 #endif 390 } else { 391 buf->inuse++; 392 for (;;) { 393 if (!abuf_flush_do(buf)) 394 break; 395 } 396 buf->inuse--; 397 if (ABUF_HUP(buf)) { 398 abuf_hup_do(buf); 399 return 0; 400 } 401 } 402 return 1; 403 } 404 405 /* 406 * Notify the write end of the buffer that there is room and data can be 407 * written again. This routine can only be called from the out() 408 * call-back of the reader. 409 * 410 * Return 1 if the buffer was filled, and 0 if eof condition occured. The 411 * reader must detach the buffer on EOF condition, since its aproc->eof() 412 * call-back will never be called. 413 */ 414 int 415 abuf_fill(struct abuf *buf) 416 { 417 if (buf->inuse) { 418 #ifdef DEBUG 419 if (debug_level >= 4) { 420 abuf_dbg(buf); 421 dbg_puts(": fill blocked (inuse)\n"); 422 } 423 #endif 424 } else { 425 buf->inuse++; 426 for (;;) { 427 if (!abuf_fill_do(buf)) 428 break; 429 } 430 buf->inuse--; 431 if (ABUF_EOF(buf)) { 432 abuf_eof_do(buf); 433 return 0; 434 } 435 } 436 return 1; 437 } 438 439 /* 440 * Run a read/write loop on the buffer until either the reader or the 441 * writer blocks, or until the buffer reaches eofs. We can not get hup here, 442 * since hup() is only called from terminal nodes, from the main loop. 443 * 444 * NOTE: The buffer may disappear (ie. be free()ed) if eof is reached, so 445 * do not keep references to the buffer or to its writer or reader. 446 */ 447 void 448 abuf_run(struct abuf *buf) 449 { 450 int canfill = 1, canflush = 1; 451 452 if (buf->inuse) { 453 #ifdef DEBUG 454 if (debug_level >= 4) { 455 abuf_dbg(buf); 456 dbg_puts(": run blocked (inuse)\n"); 457 } 458 #endif 459 return; 460 } 461 buf->inuse++; 462 for (;;) { 463 if (canfill) { 464 if (!abuf_fill_do(buf)) 465 canfill = 0; 466 else 467 canflush = 1; 468 } else if (canflush) { 469 if (!abuf_flush_do(buf)) 470 canflush = 0; 471 else 472 canfill = 1; 473 } else 474 break; 475 } 476 buf->inuse--; 477 if (ABUF_EOF(buf)) { 478 abuf_eof_do(buf); 479 return; 480 } 481 if (ABUF_HUP(buf)) { 482 abuf_hup_do(buf); 483 return; 484 } 485 } 486 487 /* 488 * Notify the reader that there will be no more input (producer 489 * disappeared). The buffer is flushed and eof() is called only if all 490 * data is flushed. 491 */ 492 void 493 abuf_eof(struct abuf *buf) 494 { 495 #ifdef DEBUG 496 if (debug_level >= 3) { 497 abuf_dbg(buf); 498 dbg_puts(": eof requested\n"); 499 } 500 if (buf->wproc == NULL) { 501 abuf_dbg(buf); 502 dbg_puts(": eof, no writer\n"); 503 dbg_panic(); 504 } 505 #endif 506 LIST_REMOVE(buf, oent); 507 buf->wproc = NULL; 508 if (buf->rproc != NULL) { 509 if (!abuf_flush(buf)) 510 return; 511 if (ABUF_ROK(buf)) { 512 /* 513 * Could not flush everything, the reader will 514 * have a chance to delete the abuf later. 515 */ 516 #ifdef DEBUG 517 if (debug_level >= 3) { 518 abuf_dbg(buf); 519 dbg_puts(": eof, blocked (drain)\n"); 520 } 521 #endif 522 return; 523 } 524 } 525 if (buf->inuse) { 526 #ifdef DEBUG 527 if (debug_level >= 3) { 528 abuf_dbg(buf); 529 dbg_puts(": eof, blocked (inuse)\n"); 530 } 531 #endif 532 return; 533 } 534 abuf_eof_do(buf); 535 } 536 537 /* 538 * Notify the writer that the buffer has no more consumer, 539 * and that no more data will accepted. 540 */ 541 void 542 abuf_hup(struct abuf *buf) 543 { 544 #ifdef DEBUG 545 if (debug_level >= 3) { 546 abuf_dbg(buf); 547 dbg_puts(": hup requested\n"); 548 } 549 if (buf->rproc == NULL) { 550 abuf_dbg(buf); 551 dbg_puts(": hup, no reader\n"); 552 dbg_panic(); 553 } 554 #endif 555 buf->rproc = NULL; 556 LIST_REMOVE(buf, ient); 557 if (buf->wproc != NULL) { 558 if (buf->inuse) { 559 #ifdef DEBUG 560 if (debug_level >= 3) { 561 abuf_dbg(buf); 562 dbg_puts(": eof, blocked (inuse)\n"); 563 } 564 #endif 565 return; 566 } 567 } 568 abuf_hup_do(buf); 569 } 570 571 /* 572 * Notify the reader of the change of its real-time position 573 */ 574 void 575 abuf_ipos(struct abuf *buf, int delta) 576 { 577 struct aproc *p = buf->rproc; 578 579 if (p && p->ops->ipos) { 580 buf->inuse++; 581 #ifdef DEBUG 582 if (debug_level >= 4) { 583 aproc_dbg(p); 584 dbg_puts(": ipos delta = "); 585 dbg_puti(delta); 586 dbg_puts("\n"); 587 } 588 #endif 589 p->ops->ipos(p, buf, delta); 590 buf->inuse--; 591 } 592 if (ABUF_HUP(buf)) 593 abuf_hup_do(buf); 594 } 595 596 /* 597 * Notify the writer of the change of its real-time position 598 */ 599 void 600 abuf_opos(struct abuf *buf, int delta) 601 { 602 struct aproc *p = buf->wproc; 603 604 if (p && p->ops->opos) { 605 buf->inuse++; 606 #ifdef DEBUG 607 if (debug_level >= 4) { 608 aproc_dbg(p); 609 dbg_puts(": opos delta = "); 610 dbg_puti(delta); 611 dbg_puts("\n"); 612 } 613 #endif 614 p->ops->opos(p, buf, delta); 615 buf->inuse--; 616 } 617 if (ABUF_HUP(buf)) 618 abuf_hup_do(buf); 619 } 620