1 /* $OpenBSD: sys_pipe.c,v 1.65 2014/01/24 06:00:01 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 44 #include <uvm/uvm_extern.h> 45 46 #include <sys/pipe.h> 47 48 /* 49 * interfaces to the outside world 50 */ 51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 53 int pipe_close(struct file *, struct proc *); 54 int pipe_poll(struct file *, int events, struct proc *); 55 int pipe_kqfilter(struct file *fp, struct knote *kn); 56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 58 59 static struct fileops pipeops = { 60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 61 pipe_stat, pipe_close 62 }; 63 64 void filt_pipedetach(struct knote *kn); 65 int filt_piperead(struct knote *kn, long hint); 66 int filt_pipewrite(struct knote *kn, long hint); 67 68 struct filterops pipe_rfiltops = 69 { 1, NULL, filt_pipedetach, filt_piperead }; 70 struct filterops pipe_wfiltops = 71 { 1, NULL, filt_pipedetach, filt_pipewrite }; 72 73 /* 74 * Default pipe buffer size(s), this can be kind-of large now because pipe 75 * space is pageable. The pipe code will try to maintain locality of 76 * reference for performance reasons, so small amounts of outstanding I/O 77 * will not wipe the cache. 78 */ 79 #define MINPIPESIZE (PIPE_SIZE/3) 80 81 /* 82 * Limit the number of "big" pipes 83 */ 84 #define LIMITBIGPIPES 32 85 int nbigpipe; 86 static int amountpipekva; 87 88 struct pool pipe_pool; 89 90 void pipeclose(struct pipe *); 91 void pipe_free_kmem(struct pipe *); 92 int pipe_create(struct pipe *); 93 int pipelock(struct pipe *); 94 void pipeunlock(struct pipe *); 95 void pipeselwakeup(struct pipe *); 96 int pipespace(struct pipe *, u_int); 97 98 /* 99 * The pipe system call for the DTYPE_PIPE type of pipes 100 */ 101 102 /* ARGSUSED */ 103 int 104 sys_pipe(struct proc *p, void *v, register_t *retval) 105 { 106 struct sys_pipe_args /* { 107 syscallarg(int *) fdp; 108 } */ *uap = v; 109 struct filedesc *fdp = p->p_fd; 110 struct file *rf, *wf; 111 struct pipe *rpipe, *wpipe = NULL; 112 int fds[2], error; 113 114 rpipe = pool_get(&pipe_pool, PR_WAITOK); 115 error = pipe_create(rpipe); 116 if (error != 0) 117 goto free1; 118 wpipe = pool_get(&pipe_pool, PR_WAITOK); 119 error = pipe_create(wpipe); 120 if (error != 0) 121 goto free1; 122 123 fdplock(fdp); 124 125 error = falloc(p, &rf, &fds[0]); 126 if (error != 0) 127 goto free2; 128 rf->f_flag = FREAD | FWRITE; 129 rf->f_type = DTYPE_PIPE; 130 rf->f_data = rpipe; 131 rf->f_ops = &pipeops; 132 133 error = falloc(p, &wf, &fds[1]); 134 if (error != 0) 135 goto free3; 136 wf->f_flag = FREAD | FWRITE; 137 wf->f_type = DTYPE_PIPE; 138 wf->f_data = wpipe; 139 wf->f_ops = &pipeops; 140 141 rpipe->pipe_peer = wpipe; 142 wpipe->pipe_peer = rpipe; 143 144 FILE_SET_MATURE(rf, p); 145 FILE_SET_MATURE(wf, p); 146 147 error = copyout(fds, SCARG(uap, fdp), sizeof(fds)); 148 if (error != 0) { 149 fdrelease(p, fds[0]); 150 fdrelease(p, fds[1]); 151 } 152 fdpunlock(fdp); 153 return (error); 154 155 free3: 156 fdremove(fdp, fds[0]); 157 closef(rf, p); 158 rpipe = NULL; 159 free2: 160 fdpunlock(fdp); 161 free1: 162 pipeclose(wpipe); 163 pipeclose(rpipe); 164 return (error); 165 } 166 167 /* 168 * Allocate kva for pipe circular buffer, the space is pageable. 169 * This routine will 'realloc' the size of a pipe safely, if it fails 170 * it will retain the old buffer. 171 * If it fails it will return ENOMEM. 172 */ 173 int 174 pipespace(struct pipe *cpipe, u_int size) 175 { 176 caddr_t buffer; 177 178 buffer = (caddr_t)uvm_km_valloc(kernel_map, size); 179 if (buffer == NULL) { 180 return (ENOMEM); 181 } 182 183 /* free old resources if we are resizing */ 184 pipe_free_kmem(cpipe); 185 cpipe->pipe_buffer.buffer = buffer; 186 cpipe->pipe_buffer.size = size; 187 cpipe->pipe_buffer.in = 0; 188 cpipe->pipe_buffer.out = 0; 189 cpipe->pipe_buffer.cnt = 0; 190 191 amountpipekva += cpipe->pipe_buffer.size; 192 193 return (0); 194 } 195 196 /* 197 * initialize and allocate VM and memory for pipe 198 */ 199 int 200 pipe_create(struct pipe *cpipe) 201 { 202 int error; 203 204 /* so pipe_free_kmem() doesn't follow junk pointer */ 205 cpipe->pipe_buffer.buffer = NULL; 206 /* 207 * protect so pipeclose() doesn't follow a junk pointer 208 * if pipespace() fails. 209 */ 210 memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel)); 211 cpipe->pipe_state = 0; 212 cpipe->pipe_peer = NULL; 213 cpipe->pipe_busy = 0; 214 215 error = pipespace(cpipe, PIPE_SIZE); 216 if (error != 0) 217 return (error); 218 219 getnanotime(&cpipe->pipe_ctime); 220 cpipe->pipe_atime = cpipe->pipe_ctime; 221 cpipe->pipe_mtime = cpipe->pipe_ctime; 222 cpipe->pipe_pgid = NO_PID; 223 224 return (0); 225 } 226 227 228 /* 229 * lock a pipe for I/O, blocking other access 230 */ 231 int 232 pipelock(struct pipe *cpipe) 233 { 234 int error; 235 while (cpipe->pipe_state & PIPE_LOCK) { 236 cpipe->pipe_state |= PIPE_LWANT; 237 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 238 return error; 239 } 240 cpipe->pipe_state |= PIPE_LOCK; 241 return 0; 242 } 243 244 /* 245 * unlock a pipe I/O lock 246 */ 247 void 248 pipeunlock(struct pipe *cpipe) 249 { 250 cpipe->pipe_state &= ~PIPE_LOCK; 251 if (cpipe->pipe_state & PIPE_LWANT) { 252 cpipe->pipe_state &= ~PIPE_LWANT; 253 wakeup(cpipe); 254 } 255 } 256 257 void 258 pipeselwakeup(struct pipe *cpipe) 259 { 260 if (cpipe->pipe_state & PIPE_SEL) { 261 cpipe->pipe_state &= ~PIPE_SEL; 262 selwakeup(&cpipe->pipe_sel); 263 } else 264 KNOTE(&cpipe->pipe_sel.si_note, 0); 265 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 266 gsignal(cpipe->pipe_pgid, SIGIO); 267 } 268 269 /* ARGSUSED */ 270 int 271 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 272 { 273 struct pipe *rpipe = (struct pipe *) fp->f_data; 274 int error; 275 int nread = 0; 276 int size; 277 278 error = pipelock(rpipe); 279 if (error) 280 return (error); 281 282 ++rpipe->pipe_busy; 283 284 while (uio->uio_resid) { 285 /* 286 * normal pipe buffer receive 287 */ 288 if (rpipe->pipe_buffer.cnt > 0) { 289 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 290 if (size > rpipe->pipe_buffer.cnt) 291 size = rpipe->pipe_buffer.cnt; 292 if (size > uio->uio_resid) 293 size = uio->uio_resid; 294 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 295 size, uio); 296 if (error) { 297 break; 298 } 299 rpipe->pipe_buffer.out += size; 300 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 301 rpipe->pipe_buffer.out = 0; 302 303 rpipe->pipe_buffer.cnt -= size; 304 /* 305 * If there is no more to read in the pipe, reset 306 * its pointers to the beginning. This improves 307 * cache hit stats. 308 */ 309 if (rpipe->pipe_buffer.cnt == 0) { 310 rpipe->pipe_buffer.in = 0; 311 rpipe->pipe_buffer.out = 0; 312 } 313 nread += size; 314 } else { 315 /* 316 * detect EOF condition 317 * read returns 0 on EOF, no need to set error 318 */ 319 if (rpipe->pipe_state & PIPE_EOF) 320 break; 321 322 /* 323 * If the "write-side" has been blocked, wake it up now. 324 */ 325 if (rpipe->pipe_state & PIPE_WANTW) { 326 rpipe->pipe_state &= ~PIPE_WANTW; 327 wakeup(rpipe); 328 } 329 330 /* 331 * Break if some data was read. 332 */ 333 if (nread > 0) 334 break; 335 336 /* 337 * Unlock the pipe buffer for our remaining processing. 338 * We will either break out with an error or we will 339 * sleep and relock to loop. 340 */ 341 pipeunlock(rpipe); 342 343 /* 344 * Handle non-blocking mode operation or 345 * wait for more data. 346 */ 347 if (fp->f_flag & FNONBLOCK) { 348 error = EAGAIN; 349 } else { 350 rpipe->pipe_state |= PIPE_WANTR; 351 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 352 error = pipelock(rpipe); 353 } 354 if (error) 355 goto unlocked_error; 356 } 357 } 358 pipeunlock(rpipe); 359 360 if (error == 0) 361 getnanotime(&rpipe->pipe_atime); 362 unlocked_error: 363 --rpipe->pipe_busy; 364 365 /* 366 * PIPE_WANT processing only makes sense if pipe_busy is 0. 367 */ 368 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 369 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 370 wakeup(rpipe); 371 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 372 /* 373 * Handle write blocking hysteresis. 374 */ 375 if (rpipe->pipe_state & PIPE_WANTW) { 376 rpipe->pipe_state &= ~PIPE_WANTW; 377 wakeup(rpipe); 378 } 379 } 380 381 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 382 pipeselwakeup(rpipe); 383 384 return (error); 385 } 386 387 int 388 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 389 { 390 int error = 0; 391 int orig_resid; 392 393 struct pipe *wpipe, *rpipe; 394 395 rpipe = (struct pipe *) fp->f_data; 396 wpipe = rpipe->pipe_peer; 397 398 /* 399 * detect loss of pipe read side, issue SIGPIPE if lost. 400 */ 401 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 402 return (EPIPE); 403 } 404 ++wpipe->pipe_busy; 405 406 /* 407 * If it is advantageous to resize the pipe buffer, do 408 * so. 409 */ 410 if ((uio->uio_resid > PIPE_SIZE) && 411 (nbigpipe < LIMITBIGPIPES) && 412 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 413 (wpipe->pipe_buffer.cnt == 0)) { 414 415 if ((error = pipelock(wpipe)) == 0) { 416 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 417 nbigpipe++; 418 pipeunlock(wpipe); 419 } 420 } 421 422 /* 423 * If an early error occurred unbusy and return, waking up any pending 424 * readers. 425 */ 426 if (error) { 427 --wpipe->pipe_busy; 428 if ((wpipe->pipe_busy == 0) && 429 (wpipe->pipe_state & PIPE_WANT)) { 430 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 431 wakeup(wpipe); 432 } 433 return (error); 434 } 435 436 orig_resid = uio->uio_resid; 437 438 while (uio->uio_resid) { 439 int space; 440 441 retrywrite: 442 if (wpipe->pipe_state & PIPE_EOF) { 443 error = EPIPE; 444 break; 445 } 446 447 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 448 449 /* Writes of size <= PIPE_BUF must be atomic. */ 450 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 451 space = 0; 452 453 if (space > 0) { 454 if ((error = pipelock(wpipe)) == 0) { 455 int size; /* Transfer size */ 456 int segsize; /* first segment to transfer */ 457 458 /* 459 * If a process blocked in uiomove, our 460 * value for space might be bad. 461 * 462 * XXX will we be ok if the reader has gone 463 * away here? 464 */ 465 if (space > wpipe->pipe_buffer.size - 466 wpipe->pipe_buffer.cnt) { 467 pipeunlock(wpipe); 468 goto retrywrite; 469 } 470 471 /* 472 * Transfer size is minimum of uio transfer 473 * and free space in pipe buffer. 474 */ 475 if (space > uio->uio_resid) 476 size = uio->uio_resid; 477 else 478 size = space; 479 /* 480 * First segment to transfer is minimum of 481 * transfer size and contiguous space in 482 * pipe buffer. If first segment to transfer 483 * is less than the transfer size, we've got 484 * a wraparound in the buffer. 485 */ 486 segsize = wpipe->pipe_buffer.size - 487 wpipe->pipe_buffer.in; 488 if (segsize > size) 489 segsize = size; 490 491 /* Transfer first segment */ 492 493 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 494 segsize, uio); 495 496 if (error == 0 && segsize < size) { 497 /* 498 * Transfer remaining part now, to 499 * support atomic writes. Wraparound 500 * happened. 501 */ 502 #ifdef DIAGNOSTIC 503 if (wpipe->pipe_buffer.in + segsize != 504 wpipe->pipe_buffer.size) 505 panic("Expected pipe buffer wraparound disappeared"); 506 #endif 507 508 error = uiomove(&wpipe->pipe_buffer.buffer[0], 509 size - segsize, uio); 510 } 511 if (error == 0) { 512 wpipe->pipe_buffer.in += size; 513 if (wpipe->pipe_buffer.in >= 514 wpipe->pipe_buffer.size) { 515 #ifdef DIAGNOSTIC 516 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 517 panic("Expected wraparound bad"); 518 #endif 519 wpipe->pipe_buffer.in = size - segsize; 520 } 521 522 wpipe->pipe_buffer.cnt += size; 523 #ifdef DIAGNOSTIC 524 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 525 panic("Pipe buffer overflow"); 526 #endif 527 } 528 pipeunlock(wpipe); 529 } 530 if (error) 531 break; 532 } else { 533 /* 534 * If the "read-side" has been blocked, wake it up now. 535 */ 536 if (wpipe->pipe_state & PIPE_WANTR) { 537 wpipe->pipe_state &= ~PIPE_WANTR; 538 wakeup(wpipe); 539 } 540 541 /* 542 * don't block on non-blocking I/O 543 */ 544 if (fp->f_flag & FNONBLOCK) { 545 error = EAGAIN; 546 break; 547 } 548 549 /* 550 * We have no more space and have something to offer, 551 * wake up select/poll. 552 */ 553 pipeselwakeup(wpipe); 554 555 wpipe->pipe_state |= PIPE_WANTW; 556 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 557 "pipewr", 0); 558 if (error) 559 break; 560 /* 561 * If read side wants to go away, we just issue a 562 * signal to ourselves. 563 */ 564 if (wpipe->pipe_state & PIPE_EOF) { 565 error = EPIPE; 566 break; 567 } 568 } 569 } 570 571 --wpipe->pipe_busy; 572 573 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 574 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 575 wakeup(wpipe); 576 } else if (wpipe->pipe_buffer.cnt > 0) { 577 /* 578 * If we have put any characters in the buffer, we wake up 579 * the reader. 580 */ 581 if (wpipe->pipe_state & PIPE_WANTR) { 582 wpipe->pipe_state &= ~PIPE_WANTR; 583 wakeup(wpipe); 584 } 585 } 586 587 /* 588 * Don't return EPIPE if I/O was successful 589 */ 590 if ((wpipe->pipe_buffer.cnt == 0) && 591 (uio->uio_resid == 0) && 592 (error == EPIPE)) { 593 error = 0; 594 } 595 596 if (error == 0) 597 getnanotime(&wpipe->pipe_mtime); 598 /* 599 * We have something to offer, wake up select/poll. 600 */ 601 if (wpipe->pipe_buffer.cnt) 602 pipeselwakeup(wpipe); 603 604 return (error); 605 } 606 607 /* 608 * we implement a very minimal set of ioctls for compatibility with sockets. 609 */ 610 int 611 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 612 { 613 struct pipe *mpipe = (struct pipe *)fp->f_data; 614 615 switch (cmd) { 616 617 case FIONBIO: 618 return (0); 619 620 case FIOASYNC: 621 if (*(int *)data) { 622 mpipe->pipe_state |= PIPE_ASYNC; 623 } else { 624 mpipe->pipe_state &= ~PIPE_ASYNC; 625 } 626 return (0); 627 628 case FIONREAD: 629 *(int *)data = mpipe->pipe_buffer.cnt; 630 return (0); 631 632 case SIOCSPGRP: 633 mpipe->pipe_pgid = *(int *)data; 634 return (0); 635 636 case SIOCGPGRP: 637 *(int *)data = mpipe->pipe_pgid; 638 return (0); 639 640 } 641 return (ENOTTY); 642 } 643 644 int 645 pipe_poll(struct file *fp, int events, struct proc *p) 646 { 647 struct pipe *rpipe = (struct pipe *)fp->f_data; 648 struct pipe *wpipe; 649 int revents = 0; 650 651 wpipe = rpipe->pipe_peer; 652 if (events & (POLLIN | POLLRDNORM)) { 653 if ((rpipe->pipe_buffer.cnt > 0) || 654 (rpipe->pipe_state & PIPE_EOF)) 655 revents |= events & (POLLIN | POLLRDNORM); 656 } 657 658 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 659 if ((rpipe->pipe_state & PIPE_EOF) || 660 (wpipe == NULL) || 661 (wpipe->pipe_state & PIPE_EOF)) 662 revents |= POLLHUP; 663 else if (events & (POLLOUT | POLLWRNORM)) { 664 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 665 revents |= events & (POLLOUT | POLLWRNORM); 666 } 667 668 if (revents == 0) { 669 if (events & (POLLIN | POLLRDNORM)) { 670 selrecord(p, &rpipe->pipe_sel); 671 rpipe->pipe_state |= PIPE_SEL; 672 } 673 if (events & (POLLOUT | POLLWRNORM)) { 674 selrecord(p, &wpipe->pipe_sel); 675 wpipe->pipe_state |= PIPE_SEL; 676 } 677 } 678 return (revents); 679 } 680 681 int 682 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 683 { 684 struct pipe *pipe = (struct pipe *)fp->f_data; 685 686 memset(ub, 0, sizeof(*ub)); 687 ub->st_mode = S_IFIFO; 688 ub->st_blksize = pipe->pipe_buffer.size; 689 ub->st_size = pipe->pipe_buffer.cnt; 690 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 691 ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; 692 ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; 693 ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; 694 ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; 695 ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; 696 ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; 697 ub->st_uid = fp->f_cred->cr_uid; 698 ub->st_gid = fp->f_cred->cr_gid; 699 /* 700 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 701 * XXX (st_dev, st_ino) should be unique. 702 */ 703 return (0); 704 } 705 706 /* ARGSUSED */ 707 int 708 pipe_close(struct file *fp, struct proc *p) 709 { 710 struct pipe *cpipe = (struct pipe *)fp->f_data; 711 712 fp->f_ops = NULL; 713 fp->f_data = NULL; 714 pipeclose(cpipe); 715 return (0); 716 } 717 718 void 719 pipe_free_kmem(struct pipe *cpipe) 720 { 721 if (cpipe->pipe_buffer.buffer != NULL) { 722 if (cpipe->pipe_buffer.size > PIPE_SIZE) 723 --nbigpipe; 724 amountpipekva -= cpipe->pipe_buffer.size; 725 uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer, 726 cpipe->pipe_buffer.size); 727 cpipe->pipe_buffer.buffer = NULL; 728 } 729 } 730 731 /* 732 * shutdown the pipe 733 */ 734 void 735 pipeclose(struct pipe *cpipe) 736 { 737 struct pipe *ppipe; 738 if (cpipe) { 739 740 pipeselwakeup(cpipe); 741 742 /* 743 * If the other side is blocked, wake it up saying that 744 * we want to close it down. 745 */ 746 cpipe->pipe_state |= PIPE_EOF; 747 while (cpipe->pipe_busy) { 748 wakeup(cpipe); 749 cpipe->pipe_state |= PIPE_WANT; 750 tsleep(cpipe, PRIBIO, "pipecl", 0); 751 } 752 753 /* 754 * Disconnect from peer 755 */ 756 if ((ppipe = cpipe->pipe_peer) != NULL) { 757 pipeselwakeup(ppipe); 758 759 ppipe->pipe_state |= PIPE_EOF; 760 wakeup(ppipe); 761 ppipe->pipe_peer = NULL; 762 } 763 764 /* 765 * free resources 766 */ 767 pipe_free_kmem(cpipe); 768 pool_put(&pipe_pool, cpipe); 769 } 770 } 771 772 int 773 pipe_kqfilter(struct file *fp, struct knote *kn) 774 { 775 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 776 struct pipe *wpipe = rpipe->pipe_peer; 777 778 switch (kn->kn_filter) { 779 case EVFILT_READ: 780 kn->kn_fop = &pipe_rfiltops; 781 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 782 break; 783 case EVFILT_WRITE: 784 if (wpipe == NULL) { 785 /* other end of pipe has been closed */ 786 return (EPIPE); 787 } 788 kn->kn_fop = &pipe_wfiltops; 789 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 790 break; 791 default: 792 return (EINVAL); 793 } 794 795 return (0); 796 } 797 798 void 799 filt_pipedetach(struct knote *kn) 800 { 801 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 802 struct pipe *wpipe = rpipe->pipe_peer; 803 804 switch (kn->kn_filter) { 805 case EVFILT_READ: 806 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 807 break; 808 case EVFILT_WRITE: 809 if (wpipe == NULL) 810 return; 811 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 812 break; 813 } 814 } 815 816 /*ARGSUSED*/ 817 int 818 filt_piperead(struct knote *kn, long hint) 819 { 820 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 821 struct pipe *wpipe = rpipe->pipe_peer; 822 823 kn->kn_data = rpipe->pipe_buffer.cnt; 824 825 if ((rpipe->pipe_state & PIPE_EOF) || 826 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 827 kn->kn_flags |= EV_EOF; 828 return (1); 829 } 830 return (kn->kn_data > 0); 831 } 832 833 /*ARGSUSED*/ 834 int 835 filt_pipewrite(struct knote *kn, long hint) 836 { 837 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 838 struct pipe *wpipe = rpipe->pipe_peer; 839 840 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 841 kn->kn_data = 0; 842 kn->kn_flags |= EV_EOF; 843 return (1); 844 } 845 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 846 847 return (kn->kn_data >= PIPE_BUF); 848 } 849 850 void 851 pipe_init(void) 852 { 853 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl", 854 &pool_allocator_nointr); 855 } 856 857