1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ 20 * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $ 21 */ 22 23 /* 24 * This file contains a high-performance replacement for the socket-based 25 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 26 * all features of sockets, but does do everything that pipes normally 27 * do. 28 */ 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/kernel.h> 32 #include <sys/proc.h> 33 #include <sys/fcntl.h> 34 #include <sys/file.h> 35 #include <sys/filedesc.h> 36 #include <sys/filio.h> 37 #include <sys/ttycom.h> 38 #include <sys/stat.h> 39 #include <sys/poll.h> 40 #include <sys/select.h> 41 #include <sys/signalvar.h> 42 #include <sys/sysproto.h> 43 #include <sys/pipe.h> 44 #include <sys/vnode.h> 45 #include <sys/uio.h> 46 #include <sys/event.h> 47 #include <sys/globaldata.h> 48 #include <sys/module.h> 49 #include <sys/malloc.h> 50 #include <sys/sysctl.h> 51 #include <sys/socket.h> 52 53 #include <vm/vm.h> 54 #include <vm/vm_param.h> 55 #include <sys/lock.h> 56 #include <vm/vm_object.h> 57 #include <vm/vm_kern.h> 58 #include <vm/vm_extern.h> 59 #include <vm/pmap.h> 60 #include <vm/vm_map.h> 61 #include <vm/vm_page.h> 62 #include <vm/vm_zone.h> 63 64 #include <sys/file2.h> 65 #include <sys/signal2.h> 66 67 #include <machine/cpufunc.h> 68 69 /* 70 * interfaces to the outside world 71 */ 72 static int pipe_read (struct file *fp, struct uio *uio, 73 struct ucred *cred, int flags); 74 static int pipe_write (struct file *fp, struct uio *uio, 75 struct ucred *cred, int flags); 76 static int pipe_close (struct file *fp); 77 static int pipe_shutdown (struct file *fp, int how); 78 static int pipe_poll (struct file *fp, int events, struct ucred *cred); 79 static int pipe_kqfilter (struct file *fp, struct knote *kn); 80 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 81 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, 82 struct ucred *cred, struct sysmsg *msg); 83 84 static struct fileops pipeops = { 85 .fo_read = pipe_read, 86 .fo_write = pipe_write, 87 .fo_ioctl = pipe_ioctl, 88 .fo_poll = pipe_poll, 89 .fo_kqfilter = pipe_kqfilter, 90 .fo_stat = pipe_stat, 91 .fo_close = pipe_close, 92 .fo_shutdown = pipe_shutdown 93 }; 94 95 static void filt_pipedetach(struct knote *kn); 96 static int filt_piperead(struct knote *kn, long hint); 97 static int filt_pipewrite(struct knote *kn, long hint); 98 99 static struct filterops pipe_rfiltops = 100 { 1, NULL, filt_pipedetach, filt_piperead }; 101 static struct filterops pipe_wfiltops = 102 { 1, NULL, filt_pipedetach, filt_pipewrite }; 103 104 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 105 106 /* 107 * Default pipe buffer size(s), this can be kind-of large now because pipe 108 * space is pageable. The pipe code will try to maintain locality of 109 * reference for performance reasons, so small amounts of outstanding I/O 110 * will not wipe the cache. 111 */ 112 #define MINPIPESIZE (PIPE_SIZE/3) 113 #define MAXPIPESIZE (2*PIPE_SIZE/3) 114 115 /* 116 * Limit the number of "big" pipes 117 */ 118 #define LIMITBIGPIPES 64 119 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 120 121 static int pipe_maxbig = LIMITBIGPIPES; 122 static int pipe_maxcache = PIPEQ_MAX_CACHE; 123 static int pipe_bigcount; 124 static int pipe_nbig; 125 static int pipe_bcache_alloc; 126 static int pipe_bkmem_alloc; 127 static int pipe_rblocked_count; 128 static int pipe_wblocked_count; 129 130 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 131 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, 132 CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); 133 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount, 134 CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded"); 135 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked, 136 CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded"); 137 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked, 138 CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded"); 139 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 140 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 141 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, 142 CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); 143 #ifdef SMP 144 static int pipe_delay = 5000; /* 5uS default */ 145 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 146 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 147 static int pipe_mpsafe = 1; 148 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe, 149 CTLFLAG_RW, &pipe_mpsafe, 0, ""); 150 #endif 151 #if !defined(NO_PIPE_SYSCTL_STATS) 152 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, 153 CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); 154 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, 155 CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); 156 #endif 157 158 static void pipeclose (struct pipe *cpipe); 159 static void pipe_free_kmem (struct pipe *cpipe); 160 static int pipe_create (struct pipe **cpipep); 161 static __inline void pipeselwakeup (struct pipe *cpipe); 162 static int pipespace (struct pipe *cpipe, int size); 163 164 static __inline int 165 pipeseltest(struct pipe *cpipe) 166 { 167 return ((cpipe->pipe_state & PIPE_SEL) || 168 ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) || 169 SLIST_FIRST(&cpipe->pipe_sel.si_note)); 170 } 171 172 static __inline void 173 pipeselwakeup(struct pipe *cpipe) 174 { 175 if (cpipe->pipe_state & PIPE_SEL) { 176 get_mplock(); 177 cpipe->pipe_state &= ~PIPE_SEL; 178 selwakeup(&cpipe->pipe_sel); 179 rel_mplock(); 180 } 181 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { 182 get_mplock(); 183 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 184 rel_mplock(); 185 } 186 if (SLIST_FIRST(&cpipe->pipe_sel.si_note)) { 187 get_mplock(); 188 KNOTE(&cpipe->pipe_sel.si_note, 0); 189 rel_mplock(); 190 } 191 } 192 193 /* 194 * These routines are called before and after a UIO. The UIO 195 * may block, causing our held tokens to be lost temporarily. 196 * 197 * We use these routines to serialize reads against other reads 198 * and writes against other writes. 199 * 200 * The read token is held on entry so *ipp does not race. 201 */ 202 static __inline int 203 pipe_start_uio(struct pipe *cpipe, int *ipp) 204 { 205 int error; 206 207 while (*ipp) { 208 *ipp = -1; 209 error = tsleep(ipp, PCATCH, "pipexx", 0); 210 if (error) 211 return (error); 212 } 213 *ipp = 1; 214 return (0); 215 } 216 217 static __inline void 218 pipe_end_uio(struct pipe *cpipe, int *ipp) 219 { 220 if (*ipp < 0) { 221 *ipp = 0; 222 wakeup(ipp); 223 } else { 224 KKASSERT(*ipp > 0); 225 *ipp = 0; 226 } 227 } 228 229 static __inline void 230 pipe_get_mplock(int *save) 231 { 232 #ifdef SMP 233 if (pipe_mpsafe == 0) { 234 get_mplock(); 235 *save = 1; 236 } else 237 #endif 238 { 239 *save = 0; 240 } 241 } 242 243 static __inline void 244 pipe_rel_mplock(int *save) 245 { 246 #ifdef SMP 247 if (*save) 248 rel_mplock(); 249 #endif 250 } 251 252 253 /* 254 * The pipe system call for the DTYPE_PIPE type of pipes 255 * 256 * pipe_ARgs(int dummy) 257 */ 258 259 /* ARGSUSED */ 260 int 261 sys_pipe(struct pipe_args *uap) 262 { 263 struct thread *td = curthread; 264 struct proc *p = td->td_proc; 265 struct file *rf, *wf; 266 struct pipe *rpipe, *wpipe; 267 int fd1, fd2, error; 268 269 KKASSERT(p); 270 271 rpipe = wpipe = NULL; 272 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 273 pipeclose(rpipe); 274 pipeclose(wpipe); 275 return (ENFILE); 276 } 277 278 error = falloc(p, &rf, &fd1); 279 if (error) { 280 pipeclose(rpipe); 281 pipeclose(wpipe); 282 return (error); 283 } 284 uap->sysmsg_fds[0] = fd1; 285 286 /* 287 * Warning: once we've gotten past allocation of the fd for the 288 * read-side, we can only drop the read side via fdrop() in order 289 * to avoid races against processes which manage to dup() the read 290 * side while we are blocked trying to allocate the write side. 291 */ 292 rf->f_type = DTYPE_PIPE; 293 rf->f_flag = FREAD | FWRITE; 294 rf->f_ops = &pipeops; 295 rf->f_data = rpipe; 296 error = falloc(p, &wf, &fd2); 297 if (error) { 298 fsetfd(p, NULL, fd1); 299 fdrop(rf); 300 /* rpipe has been closed by fdrop(). */ 301 pipeclose(wpipe); 302 return (error); 303 } 304 wf->f_type = DTYPE_PIPE; 305 wf->f_flag = FREAD | FWRITE; 306 wf->f_ops = &pipeops; 307 wf->f_data = wpipe; 308 uap->sysmsg_fds[1] = fd2; 309 310 rpipe->pipe_slock = kmalloc(sizeof(struct lock), 311 M_PIPE, M_WAITOK|M_ZERO); 312 wpipe->pipe_slock = rpipe->pipe_slock; 313 rpipe->pipe_peer = wpipe; 314 wpipe->pipe_peer = rpipe; 315 lockinit(rpipe->pipe_slock, "pipecl", 0, 0); 316 317 /* 318 * Once activated the peer relationship remains valid until 319 * both sides are closed. 320 */ 321 fsetfd(p, rf, fd1); 322 fsetfd(p, wf, fd2); 323 fdrop(rf); 324 fdrop(wf); 325 326 return (0); 327 } 328 329 /* 330 * Allocate kva for pipe circular buffer, the space is pageable 331 * This routine will 'realloc' the size of a pipe safely, if it fails 332 * it will retain the old buffer. 333 * If it fails it will return ENOMEM. 334 */ 335 static int 336 pipespace(struct pipe *cpipe, int size) 337 { 338 struct vm_object *object; 339 caddr_t buffer; 340 int npages, error; 341 342 npages = round_page(size) / PAGE_SIZE; 343 object = cpipe->pipe_buffer.object; 344 345 /* 346 * [re]create the object if necessary and reserve space for it 347 * in the kernel_map. The object and memory are pageable. On 348 * success, free the old resources before assigning the new 349 * ones. 350 */ 351 if (object == NULL || object->size != npages) { 352 get_mplock(); 353 object = vm_object_allocate(OBJT_DEFAULT, npages); 354 buffer = (caddr_t)vm_map_min(&kernel_map); 355 356 error = vm_map_find(&kernel_map, object, 0, 357 (vm_offset_t *)&buffer, size, 358 1, 359 VM_MAPTYPE_NORMAL, 360 VM_PROT_ALL, VM_PROT_ALL, 361 0); 362 363 if (error != KERN_SUCCESS) { 364 vm_object_deallocate(object); 365 rel_mplock(); 366 return (ENOMEM); 367 } 368 pipe_free_kmem(cpipe); 369 rel_mplock(); 370 cpipe->pipe_buffer.object = object; 371 cpipe->pipe_buffer.buffer = buffer; 372 cpipe->pipe_buffer.size = size; 373 ++pipe_bkmem_alloc; 374 } else { 375 ++pipe_bcache_alloc; 376 } 377 cpipe->pipe_buffer.rindex = 0; 378 cpipe->pipe_buffer.windex = 0; 379 return (0); 380 } 381 382 /* 383 * Initialize and allocate VM and memory for pipe, pulling the pipe from 384 * our per-cpu cache if possible. For now make sure it is sized for the 385 * smaller PIPE_SIZE default. 386 */ 387 static int 388 pipe_create(struct pipe **cpipep) 389 { 390 globaldata_t gd = mycpu; 391 struct pipe *cpipe; 392 int error; 393 394 if ((cpipe = gd->gd_pipeq) != NULL) { 395 gd->gd_pipeq = cpipe->pipe_peer; 396 --gd->gd_pipeqcount; 397 cpipe->pipe_peer = NULL; 398 cpipe->pipe_wantwcnt = 0; 399 } else { 400 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); 401 } 402 *cpipep = cpipe; 403 if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) 404 return (error); 405 vfs_timestamp(&cpipe->pipe_ctime); 406 cpipe->pipe_atime = cpipe->pipe_ctime; 407 cpipe->pipe_mtime = cpipe->pipe_ctime; 408 lwkt_token_init(&cpipe->pipe_rlock); 409 lwkt_token_init(&cpipe->pipe_wlock); 410 return (0); 411 } 412 413 /* 414 * MPALMOSTSAFE (acquires mplock) 415 */ 416 static int 417 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 418 { 419 struct pipe *rpipe; 420 int error; 421 size_t nread = 0; 422 int nbio; 423 u_int size; /* total bytes available */ 424 u_int nsize; /* total bytes to read */ 425 u_int rindex; /* contiguous bytes available */ 426 int notify_writer; 427 lwkt_tokref rlock; 428 lwkt_tokref wlock; 429 int mpsave; 430 int bigread; 431 int bigcount; 432 433 if (uio->uio_resid == 0) 434 return(0); 435 436 /* 437 * Setup locks, calculate nbio 438 */ 439 pipe_get_mplock(&mpsave); 440 rpipe = (struct pipe *)fp->f_data; 441 lwkt_gettoken(&rlock, &rpipe->pipe_rlock); 442 443 if (fflags & O_FBLOCKING) 444 nbio = 0; 445 else if (fflags & O_FNONBLOCKING) 446 nbio = 1; 447 else if (fp->f_flag & O_NONBLOCK) 448 nbio = 1; 449 else 450 nbio = 0; 451 452 /* 453 * Reads are serialized. Note howeverthat pipe_buffer.buffer and 454 * pipe_buffer.size can change out from under us when the number 455 * of bytes in the buffer are zero due to the write-side doing a 456 * pipespace(). 457 */ 458 error = pipe_start_uio(rpipe, &rpipe->pipe_rip); 459 if (error) { 460 pipe_rel_mplock(&mpsave); 461 lwkt_reltoken(&rlock); 462 return (error); 463 } 464 notify_writer = 0; 465 466 bigread = (uio->uio_resid > 10 * 1024 * 1024); 467 bigcount = 10; 468 469 while (uio->uio_resid) { 470 /* 471 * Don't hog the cpu. 472 */ 473 if (bigread && --bigcount == 0) { 474 lwkt_user_yield(); 475 bigcount = 10; 476 if (CURSIG(curthread->td_lwp)) { 477 error = EINTR; 478 break; 479 } 480 } 481 482 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 483 cpu_lfence(); 484 if (size) { 485 rindex = rpipe->pipe_buffer.rindex & 486 (rpipe->pipe_buffer.size - 1); 487 nsize = size; 488 if (nsize > rpipe->pipe_buffer.size - rindex) 489 nsize = rpipe->pipe_buffer.size - rindex; 490 nsize = szmin(nsize, uio->uio_resid); 491 492 error = uiomove(&rpipe->pipe_buffer.buffer[rindex], 493 nsize, uio); 494 if (error) 495 break; 496 cpu_mfence(); 497 rpipe->pipe_buffer.rindex += nsize; 498 nread += nsize; 499 500 /* 501 * If the FIFO is still over half full just continue 502 * and do not try to notify the writer yet. 503 */ 504 if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) { 505 notify_writer = 0; 506 continue; 507 } 508 509 /* 510 * When the FIFO is less then half full notify any 511 * waiting writer. WANTW can be checked while 512 * holding just the rlock. 513 */ 514 notify_writer = 1; 515 if ((rpipe->pipe_state & PIPE_WANTW) == 0) 516 continue; 517 } 518 519 /* 520 * If the "write-side" was blocked we wake it up. This code 521 * is reached either when the buffer is completely emptied 522 * or if it becomes more then half-empty. 523 * 524 * Pipe_state can only be modified if both the rlock and 525 * wlock are held. 526 */ 527 if (rpipe->pipe_state & PIPE_WANTW) { 528 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 529 if (rpipe->pipe_state & PIPE_WANTW) { 530 notify_writer = 0; 531 rpipe->pipe_state &= ~PIPE_WANTW; 532 lwkt_reltoken(&wlock); 533 wakeup(rpipe); 534 } else { 535 lwkt_reltoken(&wlock); 536 } 537 } 538 539 /* 540 * Pick up our copy loop again if the writer sent data to 541 * us while we were messing around. 542 * 543 * On a SMP box poll up to pipe_delay nanoseconds for new 544 * data. Typically a value of 2000 to 4000 is sufficient 545 * to eradicate most IPIs/tsleeps/wakeups when a pipe 546 * is used for synchronous communications with small packets, 547 * and 8000 or so (8uS) will pipeline large buffer xfers 548 * between cpus over a pipe. 549 * 550 * For synchronous communications a hit means doing a 551 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 552 * where as miss requiring a tsleep/wakeup sequence 553 * will take 7uS or more. 554 */ 555 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) 556 continue; 557 558 #if defined(SMP) && defined(_RDTSC_SUPPORTED_) 559 if (pipe_delay) { 560 int64_t tsc_target; 561 int good = 0; 562 563 tsc_target = tsc_get_target(pipe_delay); 564 while (tsc_test_target(tsc_target) == 0) { 565 if (rpipe->pipe_buffer.windex != 566 rpipe->pipe_buffer.rindex) { 567 good = 1; 568 break; 569 } 570 } 571 if (good) 572 continue; 573 } 574 #endif 575 576 /* 577 * Detect EOF condition, do not set error. 578 */ 579 if (rpipe->pipe_state & PIPE_REOF) 580 break; 581 582 /* 583 * Break if some data was read, or if this was a non-blocking 584 * read. 585 */ 586 if (nread > 0) 587 break; 588 589 if (nbio) { 590 error = EAGAIN; 591 break; 592 } 593 594 /* 595 * Last chance, interlock with WANTR. 596 */ 597 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 598 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 599 if (size) { 600 lwkt_reltoken(&wlock); 601 continue; 602 } 603 604 /* 605 * Retest EOF - acquiring a new token can temporarily release 606 * tokens already held. 607 */ 608 if (rpipe->pipe_state & PIPE_REOF) { 609 lwkt_reltoken(&wlock); 610 break; 611 } 612 613 /* 614 * If there is no more to read in the pipe, reset its 615 * pointers to the beginning. This improves cache hit 616 * stats. 617 * 618 * We need both locks to modify both pointers, and there 619 * must also not be a write in progress or the uiomove() 620 * in the write might block and temporarily release 621 * its wlock, then reacquire and update windex. We are 622 * only serialized against reads, not writes. 623 * 624 * XXX should we even bother resetting the indices? It 625 * might actually be more cache efficient not to. 626 */ 627 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && 628 rpipe->pipe_wip == 0) { 629 rpipe->pipe_buffer.rindex = 0; 630 rpipe->pipe_buffer.windex = 0; 631 } 632 633 /* 634 * Wait for more data. 635 * 636 * Pipe_state can only be set if both the rlock and wlock 637 * are held. 638 */ 639 rpipe->pipe_state |= PIPE_WANTR; 640 tsleep_interlock(rpipe, PCATCH); 641 lwkt_reltoken(&wlock); 642 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0); 643 ++pipe_rblocked_count; 644 if (error) 645 break; 646 } 647 pipe_end_uio(rpipe, &rpipe->pipe_rip); 648 649 /* 650 * Uptime last access time 651 */ 652 if (error == 0 && nread) 653 vfs_timestamp(&rpipe->pipe_atime); 654 655 /* 656 * If we drained the FIFO more then half way then handle 657 * write blocking hysteresis. 658 * 659 * Note that PIPE_WANTW cannot be set by the writer without 660 * it holding both rlock and wlock, so we can test it 661 * while holding just rlock. 662 */ 663 if (notify_writer) { 664 if (rpipe->pipe_state & PIPE_WANTW) { 665 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 666 if (rpipe->pipe_state & PIPE_WANTW) { 667 rpipe->pipe_state &= ~PIPE_WANTW; 668 lwkt_reltoken(&wlock); 669 wakeup(rpipe); 670 } else { 671 lwkt_reltoken(&wlock); 672 } 673 } 674 if (pipeseltest(rpipe)) { 675 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 676 pipeselwakeup(rpipe); 677 lwkt_reltoken(&wlock); 678 } 679 } 680 /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/ 681 lwkt_reltoken(&rlock); 682 683 pipe_rel_mplock(&mpsave); 684 return (error); 685 } 686 687 /* 688 * MPALMOSTSAFE - acquires mplock 689 */ 690 static int 691 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 692 { 693 int error; 694 int orig_resid; 695 int nbio; 696 struct pipe *wpipe, *rpipe; 697 lwkt_tokref rlock; 698 lwkt_tokref wlock; 699 u_int windex; 700 u_int space; 701 u_int wcount; 702 int mpsave; 703 int bigwrite; 704 int bigcount; 705 706 pipe_get_mplock(&mpsave); 707 708 /* 709 * Writes go to the peer. The peer will always exist. 710 */ 711 rpipe = (struct pipe *) fp->f_data; 712 wpipe = rpipe->pipe_peer; 713 lwkt_gettoken(&wlock, &wpipe->pipe_wlock); 714 if (wpipe->pipe_state & PIPE_WEOF) { 715 pipe_rel_mplock(&mpsave); 716 lwkt_reltoken(&wlock); 717 return (EPIPE); 718 } 719 720 /* 721 * Degenerate case (EPIPE takes prec) 722 */ 723 if (uio->uio_resid == 0) { 724 pipe_rel_mplock(&mpsave); 725 lwkt_reltoken(&wlock); 726 return(0); 727 } 728 729 /* 730 * Writes are serialized (start_uio must be called with wlock) 731 */ 732 error = pipe_start_uio(wpipe, &wpipe->pipe_wip); 733 if (error) { 734 pipe_rel_mplock(&mpsave); 735 lwkt_reltoken(&wlock); 736 return (error); 737 } 738 739 if (fflags & O_FBLOCKING) 740 nbio = 0; 741 else if (fflags & O_FNONBLOCKING) 742 nbio = 1; 743 else if (fp->f_flag & O_NONBLOCK) 744 nbio = 1; 745 else 746 nbio = 0; 747 748 /* 749 * If it is advantageous to resize the pipe buffer, do 750 * so. We are write-serialized so we can block safely. 751 */ 752 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 753 (pipe_nbig < pipe_maxbig) && 754 wpipe->pipe_wantwcnt > 4 && 755 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 756 /* 757 * Recheck after lock. 758 */ 759 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 760 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 761 (pipe_nbig < pipe_maxbig) && 762 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 763 atomic_add_int(&pipe_nbig, 1); 764 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 765 ++pipe_bigcount; 766 else 767 atomic_subtract_int(&pipe_nbig, 1); 768 } 769 lwkt_reltoken(&rlock); 770 } 771 772 orig_resid = uio->uio_resid; 773 wcount = 0; 774 775 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 776 bigcount = 10; 777 778 while (uio->uio_resid) { 779 if (wpipe->pipe_state & PIPE_WEOF) { 780 error = EPIPE; 781 break; 782 } 783 784 /* 785 * Don't hog the cpu. 786 */ 787 if (bigwrite && --bigcount == 0) { 788 lwkt_user_yield(); 789 bigcount = 10; 790 if (CURSIG(curthread->td_lwp)) { 791 error = EINTR; 792 break; 793 } 794 } 795 796 windex = wpipe->pipe_buffer.windex & 797 (wpipe->pipe_buffer.size - 1); 798 space = wpipe->pipe_buffer.size - 799 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 800 cpu_lfence(); 801 802 /* Writes of size <= PIPE_BUF must be atomic. */ 803 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 804 space = 0; 805 806 /* 807 * Write to fill, read size handles write hysteresis. Also 808 * additional restrictions can cause select-based non-blocking 809 * writes to spin. 810 */ 811 if (space > 0) { 812 u_int segsize; 813 814 /* 815 * Transfer size is minimum of uio transfer 816 * and free space in pipe buffer. 817 * 818 * Limit each uiocopy to no more then PIPE_SIZE 819 * so we can keep the gravy train going on a 820 * SMP box. This doubles the performance for 821 * write sizes > 16K. Otherwise large writes 822 * wind up doing an inefficient synchronous 823 * ping-pong. 824 */ 825 space = szmin(space, uio->uio_resid); 826 if (space > PIPE_SIZE) 827 space = PIPE_SIZE; 828 829 /* 830 * First segment to transfer is minimum of 831 * transfer size and contiguous space in 832 * pipe buffer. If first segment to transfer 833 * is less than the transfer size, we've got 834 * a wraparound in the buffer. 835 */ 836 segsize = wpipe->pipe_buffer.size - windex; 837 if (segsize > space) 838 segsize = space; 839 840 #ifdef SMP 841 /* 842 * If this is the first loop and the reader is 843 * blocked, do a preemptive wakeup of the reader. 844 * 845 * On SMP the IPI latency plus the wlock interlock 846 * on the reader side is the fastest way to get the 847 * reader going. (The scheduler will hard loop on 848 * lock tokens). 849 * 850 * NOTE: We can't clear WANTR here without acquiring 851 * the rlock, which we don't want to do here! 852 */ 853 if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1) 854 wakeup(wpipe); 855 #endif 856 857 /* 858 * Transfer segment, which may include a wrap-around. 859 * Update windex to account for both all in one go 860 * so the reader can read() the data atomically. 861 */ 862 error = uiomove(&wpipe->pipe_buffer.buffer[windex], 863 segsize, uio); 864 if (error == 0 && segsize < space) { 865 segsize = space - segsize; 866 error = uiomove(&wpipe->pipe_buffer.buffer[0], 867 segsize, uio); 868 } 869 if (error) 870 break; 871 cpu_mfence(); 872 wpipe->pipe_buffer.windex += space; 873 wcount += space; 874 continue; 875 } 876 877 /* 878 * We need both the rlock and the wlock to interlock against 879 * the EOF, WANTW, and size checks, and to modify pipe_state. 880 * 881 * These are token locks so we do not have to worry about 882 * deadlocks. 883 */ 884 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 885 886 /* 887 * If the "read-side" has been blocked, wake it up now 888 * and yield to let it drain synchronously rather 889 * then block. 890 */ 891 if (wpipe->pipe_state & PIPE_WANTR) { 892 wpipe->pipe_state &= ~PIPE_WANTR; 893 wakeup(wpipe); 894 } 895 896 /* 897 * don't block on non-blocking I/O 898 */ 899 if (nbio) { 900 lwkt_reltoken(&rlock); 901 error = EAGAIN; 902 break; 903 } 904 905 /* 906 * re-test whether we have to block in the writer after 907 * acquiring both locks, in case the reader opened up 908 * some space. 909 */ 910 space = wpipe->pipe_buffer.size - 911 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 912 cpu_lfence(); 913 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 914 space = 0; 915 916 /* 917 * Retest EOF - acquiring a new token can temporarily release 918 * tokens already held. 919 */ 920 if (wpipe->pipe_state & PIPE_WEOF) { 921 lwkt_reltoken(&rlock); 922 error = EPIPE; 923 break; 924 } 925 926 /* 927 * We have no more space and have something to offer, 928 * wake up select/poll. 929 */ 930 if (space == 0) { 931 wpipe->pipe_state |= PIPE_WANTW; 932 ++wpipe->pipe_wantwcnt; 933 pipeselwakeup(wpipe); 934 if (wpipe->pipe_state & PIPE_WANTW) 935 error = tsleep(wpipe, PCATCH, "pipewr", 0); 936 ++pipe_wblocked_count; 937 } 938 lwkt_reltoken(&rlock); 939 940 /* 941 * Break out if we errored or the read side wants us to go 942 * away. 943 */ 944 if (error) 945 break; 946 if (wpipe->pipe_state & PIPE_WEOF) { 947 error = EPIPE; 948 break; 949 } 950 } 951 pipe_end_uio(wpipe, &wpipe->pipe_wip); 952 953 /* 954 * If we have put any characters in the buffer, we wake up 955 * the reader. 956 * 957 * Both rlock and wlock are required to be able to modify pipe_state. 958 */ 959 if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { 960 if (wpipe->pipe_state & PIPE_WANTR) { 961 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 962 if (wpipe->pipe_state & PIPE_WANTR) { 963 wpipe->pipe_state &= ~PIPE_WANTR; 964 lwkt_reltoken(&rlock); 965 wakeup(wpipe); 966 } else { 967 lwkt_reltoken(&rlock); 968 } 969 } 970 if (pipeseltest(wpipe)) { 971 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 972 pipeselwakeup(wpipe); 973 lwkt_reltoken(&rlock); 974 } 975 } 976 977 /* 978 * Don't return EPIPE if I/O was successful 979 */ 980 if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && 981 (uio->uio_resid == 0) && 982 (error == EPIPE)) { 983 error = 0; 984 } 985 986 if (error == 0) 987 vfs_timestamp(&wpipe->pipe_mtime); 988 989 /* 990 * We have something to offer, 991 * wake up select/poll. 992 */ 993 /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/ 994 lwkt_reltoken(&wlock); 995 pipe_rel_mplock(&mpsave); 996 return (error); 997 } 998 999 /* 1000 * MPALMOSTSAFE - acquires mplock 1001 * 1002 * we implement a very minimal set of ioctls for compatibility with sockets. 1003 */ 1004 int 1005 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, 1006 struct ucred *cred, struct sysmsg *msg) 1007 { 1008 struct pipe *mpipe; 1009 lwkt_tokref rlock; 1010 lwkt_tokref wlock; 1011 int error; 1012 int mpsave; 1013 1014 pipe_get_mplock(&mpsave); 1015 mpipe = (struct pipe *)fp->f_data; 1016 1017 lwkt_gettoken(&rlock, &mpipe->pipe_rlock); 1018 lwkt_gettoken(&wlock, &mpipe->pipe_wlock); 1019 1020 switch (cmd) { 1021 case FIOASYNC: 1022 if (*(int *)data) { 1023 mpipe->pipe_state |= PIPE_ASYNC; 1024 } else { 1025 mpipe->pipe_state &= ~PIPE_ASYNC; 1026 } 1027 error = 0; 1028 break; 1029 case FIONREAD: 1030 *(int *)data = mpipe->pipe_buffer.windex - 1031 mpipe->pipe_buffer.rindex; 1032 error = 0; 1033 break; 1034 case FIOSETOWN: 1035 get_mplock(); 1036 error = fsetown(*(int *)data, &mpipe->pipe_sigio); 1037 rel_mplock(); 1038 break; 1039 case FIOGETOWN: 1040 *(int *)data = fgetown(mpipe->pipe_sigio); 1041 error = 0; 1042 break; 1043 case TIOCSPGRP: 1044 /* This is deprecated, FIOSETOWN should be used instead. */ 1045 get_mplock(); 1046 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio); 1047 rel_mplock(); 1048 break; 1049 1050 case TIOCGPGRP: 1051 /* This is deprecated, FIOGETOWN should be used instead. */ 1052 *(int *)data = -fgetown(mpipe->pipe_sigio); 1053 error = 0; 1054 break; 1055 default: 1056 error = ENOTTY; 1057 break; 1058 } 1059 lwkt_reltoken(&rlock); 1060 lwkt_reltoken(&wlock); 1061 pipe_rel_mplock(&mpsave); 1062 1063 return (error); 1064 } 1065 1066 /* 1067 * MPALMOSTSAFE - acquires mplock 1068 * 1069 * poll for events (helper) 1070 */ 1071 static int 1072 pipe_poll_events(struct pipe *rpipe, struct pipe *wpipe, int events) 1073 { 1074 int revents = 0; 1075 u_int space; 1076 1077 if (events & (POLLIN | POLLRDNORM)) { 1078 if ((rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) || 1079 (rpipe->pipe_state & PIPE_REOF)) { 1080 revents |= events & (POLLIN | POLLRDNORM); 1081 } 1082 } 1083 1084 if (events & (POLLOUT | POLLWRNORM)) { 1085 if (wpipe == NULL || (wpipe->pipe_state & PIPE_WEOF)) { 1086 revents |= events & (POLLOUT | POLLWRNORM); 1087 } else { 1088 space = wpipe->pipe_buffer.windex - 1089 wpipe->pipe_buffer.rindex; 1090 space = wpipe->pipe_buffer.size - space; 1091 if (space >= PIPE_BUF) 1092 revents |= events & (POLLOUT | POLLWRNORM); 1093 } 1094 } 1095 1096 if ((rpipe->pipe_state & PIPE_REOF) || 1097 (wpipe == NULL) || 1098 (wpipe->pipe_state & PIPE_WEOF)) { 1099 revents |= POLLHUP; 1100 } 1101 return (revents); 1102 } 1103 1104 /* 1105 * Poll for events from file pointer. 1106 */ 1107 int 1108 pipe_poll(struct file *fp, int events, struct ucred *cred) 1109 { 1110 lwkt_tokref rpipe_rlock; 1111 lwkt_tokref rpipe_wlock; 1112 lwkt_tokref wpipe_rlock; 1113 lwkt_tokref wpipe_wlock; 1114 struct pipe *rpipe; 1115 struct pipe *wpipe; 1116 int revents = 0; 1117 int mpsave; 1118 1119 pipe_get_mplock(&mpsave); 1120 rpipe = (struct pipe *)fp->f_data; 1121 wpipe = rpipe->pipe_peer; 1122 1123 revents = pipe_poll_events(rpipe, wpipe, events); 1124 if (revents == 0) { 1125 if (events & (POLLIN | POLLRDNORM)) { 1126 lwkt_gettoken(&rpipe_rlock, &rpipe->pipe_rlock); 1127 lwkt_gettoken(&rpipe_wlock, &rpipe->pipe_wlock); 1128 } 1129 if (events & (POLLOUT | POLLWRNORM)) { 1130 lwkt_gettoken(&wpipe_rlock, &wpipe->pipe_rlock); 1131 lwkt_gettoken(&wpipe_wlock, &wpipe->pipe_wlock); 1132 } 1133 revents = pipe_poll_events(rpipe, wpipe, events); 1134 if (revents == 0) { 1135 if (events & (POLLIN | POLLRDNORM)) { 1136 selrecord(curthread, &rpipe->pipe_sel); 1137 rpipe->pipe_state |= PIPE_SEL; 1138 } 1139 1140 if (events & (POLLOUT | POLLWRNORM)) { 1141 selrecord(curthread, &wpipe->pipe_sel); 1142 wpipe->pipe_state |= PIPE_SEL; 1143 } 1144 } 1145 if (events & (POLLIN | POLLRDNORM)) { 1146 lwkt_reltoken(&rpipe_rlock); 1147 lwkt_reltoken(&rpipe_wlock); 1148 } 1149 if (events & (POLLOUT | POLLWRNORM)) { 1150 lwkt_reltoken(&wpipe_rlock); 1151 lwkt_reltoken(&wpipe_wlock); 1152 } 1153 } 1154 pipe_rel_mplock(&mpsave); 1155 return (revents); 1156 } 1157 1158 /* 1159 * MPSAFE 1160 */ 1161 static int 1162 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1163 { 1164 struct pipe *pipe; 1165 int mpsave; 1166 1167 pipe_get_mplock(&mpsave); 1168 pipe = (struct pipe *)fp->f_data; 1169 1170 bzero((caddr_t)ub, sizeof(*ub)); 1171 ub->st_mode = S_IFIFO; 1172 ub->st_blksize = pipe->pipe_buffer.size; 1173 ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex; 1174 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1175 ub->st_atimespec = pipe->pipe_atime; 1176 ub->st_mtimespec = pipe->pipe_mtime; 1177 ub->st_ctimespec = pipe->pipe_ctime; 1178 /* 1179 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1180 * st_flags, st_gen. 1181 * XXX (st_dev, st_ino) should be unique. 1182 */ 1183 pipe_rel_mplock(&mpsave); 1184 return (0); 1185 } 1186 1187 /* 1188 * MPALMOSTSAFE - acquires mplock 1189 */ 1190 static int 1191 pipe_close(struct file *fp) 1192 { 1193 struct pipe *cpipe; 1194 1195 get_mplock(); 1196 cpipe = (struct pipe *)fp->f_data; 1197 fp->f_ops = &badfileops; 1198 fp->f_data = NULL; 1199 funsetown(cpipe->pipe_sigio); 1200 pipeclose(cpipe); 1201 rel_mplock(); 1202 return (0); 1203 } 1204 1205 /* 1206 * Shutdown one or both directions of a full-duplex pipe. 1207 * 1208 * MPALMOSTSAFE - acquires mplock 1209 */ 1210 static int 1211 pipe_shutdown(struct file *fp, int how) 1212 { 1213 struct pipe *rpipe; 1214 struct pipe *wpipe; 1215 int error = EPIPE; 1216 lwkt_tokref rpipe_rlock; 1217 lwkt_tokref rpipe_wlock; 1218 lwkt_tokref wpipe_rlock; 1219 lwkt_tokref wpipe_wlock; 1220 int mpsave; 1221 1222 pipe_get_mplock(&mpsave); 1223 rpipe = (struct pipe *)fp->f_data; 1224 wpipe = rpipe->pipe_peer; 1225 1226 /* 1227 * We modify pipe_state on both pipes, which means we need 1228 * all four tokens! 1229 */ 1230 lwkt_gettoken(&rpipe_rlock, &rpipe->pipe_rlock); 1231 lwkt_gettoken(&rpipe_wlock, &rpipe->pipe_wlock); 1232 lwkt_gettoken(&wpipe_rlock, &wpipe->pipe_rlock); 1233 lwkt_gettoken(&wpipe_wlock, &wpipe->pipe_wlock); 1234 1235 switch(how) { 1236 case SHUT_RDWR: 1237 case SHUT_RD: 1238 rpipe->pipe_state |= PIPE_REOF; /* my reads */ 1239 rpipe->pipe_state |= PIPE_WEOF; /* peer writes */ 1240 if (rpipe->pipe_state & PIPE_WANTR) { 1241 rpipe->pipe_state &= ~PIPE_WANTR; 1242 wakeup(rpipe); 1243 } 1244 if (rpipe->pipe_state & PIPE_WANTW) { 1245 rpipe->pipe_state &= ~PIPE_WANTW; 1246 wakeup(rpipe); 1247 } 1248 error = 0; 1249 if (how == SHUT_RD) 1250 break; 1251 /* fall through */ 1252 case SHUT_WR: 1253 wpipe->pipe_state |= PIPE_REOF; /* peer reads */ 1254 wpipe->pipe_state |= PIPE_WEOF; /* my writes */ 1255 if (wpipe->pipe_state & PIPE_WANTR) { 1256 wpipe->pipe_state &= ~PIPE_WANTR; 1257 wakeup(wpipe); 1258 } 1259 if (wpipe->pipe_state & PIPE_WANTW) { 1260 wpipe->pipe_state &= ~PIPE_WANTW; 1261 wakeup(wpipe); 1262 } 1263 error = 0; 1264 break; 1265 } 1266 pipeselwakeup(rpipe); 1267 pipeselwakeup(wpipe); 1268 1269 lwkt_reltoken(&rpipe_rlock); 1270 lwkt_reltoken(&rpipe_wlock); 1271 lwkt_reltoken(&wpipe_rlock); 1272 lwkt_reltoken(&wpipe_wlock); 1273 1274 pipe_rel_mplock(&mpsave); 1275 return (error); 1276 } 1277 1278 static void 1279 pipe_free_kmem(struct pipe *cpipe) 1280 { 1281 if (cpipe->pipe_buffer.buffer != NULL) { 1282 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1283 atomic_subtract_int(&pipe_nbig, 1); 1284 kmem_free(&kernel_map, 1285 (vm_offset_t)cpipe->pipe_buffer.buffer, 1286 cpipe->pipe_buffer.size); 1287 cpipe->pipe_buffer.buffer = NULL; 1288 cpipe->pipe_buffer.object = NULL; 1289 } 1290 } 1291 1292 /* 1293 * Close the pipe. The slock must be held to interlock against simultanious 1294 * closes. The rlock and wlock must be held to adjust the pipe_state. 1295 */ 1296 static void 1297 pipeclose(struct pipe *cpipe) 1298 { 1299 globaldata_t gd; 1300 struct pipe *ppipe; 1301 lwkt_tokref cpipe_rlock; 1302 lwkt_tokref cpipe_wlock; 1303 lwkt_tokref ppipe_rlock; 1304 lwkt_tokref ppipe_wlock; 1305 1306 if (cpipe == NULL) 1307 return; 1308 1309 /* 1310 * The slock may not have been allocated yet (close during 1311 * initialization) 1312 * 1313 * We need both the read and write tokens to modify pipe_state. 1314 */ 1315 if (cpipe->pipe_slock) 1316 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); 1317 lwkt_gettoken(&cpipe_rlock, &cpipe->pipe_rlock); 1318 lwkt_gettoken(&cpipe_wlock, &cpipe->pipe_wlock); 1319 1320 /* 1321 * Set our state, wakeup anyone waiting in select, and 1322 * wakeup anyone blocked on our pipe. 1323 */ 1324 cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; 1325 pipeselwakeup(cpipe); 1326 if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1327 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1328 wakeup(cpipe); 1329 } 1330 1331 /* 1332 * Disconnect from peer. 1333 */ 1334 if ((ppipe = cpipe->pipe_peer) != NULL) { 1335 lwkt_gettoken(&ppipe_rlock, &ppipe->pipe_rlock); 1336 lwkt_gettoken(&ppipe_wlock, &ppipe->pipe_wlock); 1337 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF; 1338 pipeselwakeup(ppipe); 1339 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1340 ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1341 wakeup(ppipe); 1342 } 1343 if (SLIST_FIRST(&ppipe->pipe_sel.si_note)) { 1344 get_mplock(); 1345 KNOTE(&ppipe->pipe_sel.si_note, 0); 1346 rel_mplock(); 1347 } 1348 lwkt_reltoken(&ppipe_rlock); 1349 lwkt_reltoken(&ppipe_wlock); 1350 } 1351 1352 /* 1353 * If the peer is also closed we can free resources for both 1354 * sides, otherwise we leave our side intact to deal with any 1355 * races (since we only have the slock). 1356 */ 1357 if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { 1358 cpipe->pipe_peer = NULL; 1359 ppipe->pipe_peer = NULL; 1360 ppipe->pipe_slock = NULL; /* we will free the slock */ 1361 pipeclose(ppipe); 1362 ppipe = NULL; 1363 } 1364 1365 lwkt_reltoken(&cpipe_rlock); 1366 lwkt_reltoken(&cpipe_wlock); 1367 if (cpipe->pipe_slock) 1368 lockmgr(cpipe->pipe_slock, LK_RELEASE); 1369 1370 /* 1371 * If we disassociated from our peer we can free resources 1372 */ 1373 if (ppipe == NULL) { 1374 gd = mycpu; 1375 if (cpipe->pipe_slock) { 1376 kfree(cpipe->pipe_slock, M_PIPE); 1377 cpipe->pipe_slock = NULL; 1378 } 1379 if (gd->gd_pipeqcount >= pipe_maxcache || 1380 cpipe->pipe_buffer.size != PIPE_SIZE 1381 ) { 1382 pipe_free_kmem(cpipe); 1383 kfree(cpipe, M_PIPE); 1384 } else { 1385 cpipe->pipe_state = 0; 1386 cpipe->pipe_peer = gd->gd_pipeq; 1387 gd->gd_pipeq = cpipe; 1388 ++gd->gd_pipeqcount; 1389 } 1390 } 1391 } 1392 1393 /* 1394 * MPALMOSTSAFE - acquires mplock 1395 */ 1396 static int 1397 pipe_kqfilter(struct file *fp, struct knote *kn) 1398 { 1399 struct pipe *cpipe; 1400 1401 get_mplock(); 1402 cpipe = (struct pipe *)kn->kn_fp->f_data; 1403 1404 switch (kn->kn_filter) { 1405 case EVFILT_READ: 1406 kn->kn_fop = &pipe_rfiltops; 1407 break; 1408 case EVFILT_WRITE: 1409 kn->kn_fop = &pipe_wfiltops; 1410 cpipe = cpipe->pipe_peer; 1411 if (cpipe == NULL) { 1412 /* other end of pipe has been closed */ 1413 rel_mplock(); 1414 return (EPIPE); 1415 } 1416 break; 1417 default: 1418 return (1); 1419 } 1420 kn->kn_hook = (caddr_t)cpipe; 1421 1422 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1423 rel_mplock(); 1424 return (0); 1425 } 1426 1427 static void 1428 filt_pipedetach(struct knote *kn) 1429 { 1430 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1431 1432 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1433 } 1434 1435 /*ARGSUSED*/ 1436 static int 1437 filt_piperead(struct knote *kn, long hint) 1438 { 1439 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1440 1441 kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 1442 1443 /* XXX RACE */ 1444 if (rpipe->pipe_state & PIPE_REOF) { 1445 kn->kn_flags |= EV_EOF; 1446 return (1); 1447 } 1448 return (kn->kn_data > 0); 1449 } 1450 1451 /*ARGSUSED*/ 1452 static int 1453 filt_pipewrite(struct knote *kn, long hint) 1454 { 1455 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1456 struct pipe *wpipe = rpipe->pipe_peer; 1457 u_int32_t space; 1458 1459 /* XXX RACE */ 1460 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) { 1461 kn->kn_data = 0; 1462 kn->kn_flags |= EV_EOF; 1463 return (1); 1464 } 1465 space = wpipe->pipe_buffer.windex - 1466 wpipe->pipe_buffer.rindex; 1467 space = wpipe->pipe_buffer.size - space; 1468 kn->kn_data = space; 1469 return (kn->kn_data >= PIPE_BUF); 1470 } 1471