1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved. 5 * 6 * This code is derived from software contributed to The DragonFly Project 7 * by Matthew Dillon <dillon@backplane.com> 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice immediately at the beginning of the file, without modification, 14 * this list of conditions, and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 3. Absolutely no warranty of function or purpose is made by the author 19 * John S. Dyson. 20 * 4. Modifications may be freely made to this file if the above conditions 21 * are met. 22 */ 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/kernel.h> 33 #include <sys/proc.h> 34 #include <sys/fcntl.h> 35 #include <sys/file.h> 36 #include <sys/filedesc.h> 37 #include <sys/filio.h> 38 #include <sys/ttycom.h> 39 #include <sys/stat.h> 40 #include <sys/signalvar.h> 41 #include <sys/sysproto.h> 42 #include <sys/pipe.h> 43 #include <sys/vnode.h> 44 #include <sys/uio.h> 45 #include <sys/event.h> 46 #include <sys/globaldata.h> 47 #include <sys/module.h> 48 #include <sys/malloc.h> 49 #include <sys/sysctl.h> 50 #include <sys/socket.h> 51 #include <sys/kern_syscall.h> 52 #include <sys/lock.h> 53 #include <sys/mutex.h> 54 55 #include <vm/vm.h> 56 #include <vm/vm_param.h> 57 #include <vm/vm_object.h> 58 #include <vm/vm_kern.h> 59 #include <vm/vm_extern.h> 60 #include <vm/pmap.h> 61 #include <vm/vm_map.h> 62 #include <vm/vm_page.h> 63 #include <vm/vm_zone.h> 64 65 #include <sys/file2.h> 66 #include <sys/signal2.h> 67 #include <sys/mutex2.h> 68 69 #include <machine/cpufunc.h> 70 71 struct pipegdlock { 72 struct mtx mtx; 73 } __cachealign; 74 75 /* 76 * interfaces to the outside world 77 */ 78 static int pipe_read (struct file *fp, struct uio *uio, 79 struct ucred *cred, int flags); 80 static int pipe_write (struct file *fp, struct uio *uio, 81 struct ucred *cred, int flags); 82 static int pipe_close (struct file *fp); 83 static int pipe_shutdown (struct file *fp, int how); 84 static int pipe_kqfilter (struct file *fp, struct knote *kn); 85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, 87 struct ucred *cred, struct sysmsg *msg); 88 89 __read_mostly static struct fileops pipeops = { 90 .fo_read = pipe_read, 91 .fo_write = pipe_write, 92 .fo_ioctl = pipe_ioctl, 93 .fo_kqfilter = pipe_kqfilter, 94 .fo_stat = pipe_stat, 95 .fo_close = pipe_close, 96 .fo_shutdown = pipe_shutdown 97 }; 98 99 static void filt_pipedetach(struct knote *kn); 100 static int filt_piperead(struct knote *kn, long hint); 101 static int filt_pipewrite(struct knote *kn, long hint); 102 103 __read_mostly static struct filterops pipe_rfiltops = 104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead }; 105 __read_mostly static struct filterops pipe_wfiltops = 106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite }; 107 108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 109 110 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 111 112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE; 113 __read_mostly static struct pipegdlock *pipe_gdlocks; 114 115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 118 119 /* 120 * The pipe buffer size can be changed at any time. Only new pipe()s 121 * are affected. Note that due to cpu cache effects, you do not want 122 * to make this value too large. 123 */ 124 __read_mostly static int pipe_size = 32768; 125 SYSCTL_INT(_kern_pipe, OID_AUTO, size, 126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)"); 127 128 /* 129 * Reader/writer delay loop. When the reader exhausts the pipe buffer 130 * or the write completely fills the pipe buffer and would otherwise sleep, 131 * it first busy-loops for a few microseconds waiting for data or buffer 132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes 133 * and also helps when the user program uses a large data buffer in its 134 * UIOs. 135 * 136 * This defaults to 4uS. 137 */ 138 #ifdef _RDTSC_SUPPORTED_ 139 __read_mostly static int pipe_delay = 4000; /* 4uS default */ 140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 142 #endif 143 144 /* 145 * Auto-size pipe cache to reduce kmem allocations and frees. 146 */ 147 static 148 void 149 pipeinit(void *dummy) 150 { 151 size_t mbytes = kmem_lim_size(); 152 int n; 153 154 if (pipe_maxcache == PIPEQ_MAX_CACHE) { 155 if (mbytes >= 7 * 1024) 156 pipe_maxcache *= 2; 157 if (mbytes >= 15 * 1024) 158 pipe_maxcache *= 2; 159 } 160 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus, 161 M_PIPE, M_WAITOK | M_ZERO); 162 for (n = 0; n < ncpus; ++n) 163 mtx_init(&pipe_gdlocks[n].mtx, "pipekm"); 164 } 165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL); 166 167 static void pipeclose (struct pipe *pipe, 168 struct pipebuf *pbr, struct pipebuf *pbw); 169 static void pipe_free_kmem (struct pipebuf *buf); 170 static int pipe_create (struct pipe **pipep); 171 172 /* 173 * Test and clear the specified flag, wakeup(pb) if it was set. 174 * This function must also act as a memory barrier. 175 */ 176 static __inline void 177 pipesignal(struct pipebuf *pb, uint32_t flags) 178 { 179 uint32_t oflags; 180 uint32_t nflags; 181 182 for (;;) { 183 oflags = pb->state; 184 cpu_ccfence(); 185 nflags = oflags & ~flags; 186 if (atomic_cmpset_int(&pb->state, oflags, nflags)) 187 break; 188 } 189 if (oflags & flags) 190 wakeup(pb); 191 } 192 193 /* 194 * 195 */ 196 static __inline void 197 pipewakeup(struct pipebuf *pb, int dosigio) 198 { 199 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) { 200 lwkt_gettoken(&sigio_token); 201 pgsigio(pb->sigio, SIGIO, 0); 202 lwkt_reltoken(&sigio_token); 203 } 204 KNOTE(&pb->kq.ki_note, 0); 205 } 206 207 /* 208 * These routines are called before and after a UIO. The UIO 209 * may block, causing our held tokens to be lost temporarily. 210 * 211 * We use these routines to serialize reads against other reads 212 * and writes against other writes. 213 * 214 * The appropriate token is held on entry so *ipp does not race. 215 */ 216 static __inline int 217 pipe_start_uio(int *ipp) 218 { 219 int error; 220 221 while (*ipp) { 222 *ipp = -1; 223 error = tsleep(ipp, PCATCH, "pipexx", 0); 224 if (error) 225 return (error); 226 } 227 *ipp = 1; 228 return (0); 229 } 230 231 static __inline void 232 pipe_end_uio(int *ipp) 233 { 234 if (*ipp < 0) { 235 *ipp = 0; 236 wakeup(ipp); 237 } else { 238 KKASSERT(*ipp > 0); 239 *ipp = 0; 240 } 241 } 242 243 /* 244 * The pipe system call for the DTYPE_PIPE type of pipes 245 * 246 * pipe_args(int dummy) 247 * 248 * MPSAFE 249 */ 250 int 251 sys_pipe(struct pipe_args *uap) 252 { 253 return kern_pipe(uap->sysmsg_fds, 0); 254 } 255 256 int 257 sys_pipe2(struct pipe2_args *uap) 258 { 259 return kern_pipe(uap->sysmsg_fds, uap->flags); 260 } 261 262 int 263 kern_pipe(long *fds, int flags) 264 { 265 struct thread *td = curthread; 266 struct filedesc *fdp = td->td_proc->p_fd; 267 struct file *rf, *wf; 268 struct pipe *pipe; 269 int fd1, fd2, error; 270 271 pipe = NULL; 272 if (pipe_create(&pipe)) { 273 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 274 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 275 return (ENFILE); 276 } 277 278 error = falloc(td->td_lwp, &rf, &fd1); 279 if (error) { 280 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 281 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 282 return (error); 283 } 284 fds[0] = fd1; 285 286 /* 287 * Warning: once we've gotten past allocation of the fd for the 288 * read-side, we can only drop the read side via fdrop() in order 289 * to avoid races against processes which manage to dup() the read 290 * side while we are blocked trying to allocate the write side. 291 */ 292 rf->f_type = DTYPE_PIPE; 293 rf->f_flag = FREAD | FWRITE; 294 rf->f_ops = &pipeops; 295 rf->f_data = (void *)((intptr_t)pipe | 0); 296 if (flags & O_NONBLOCK) 297 rf->f_flag |= O_NONBLOCK; 298 if (flags & O_CLOEXEC) 299 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE; 300 301 error = falloc(td->td_lwp, &wf, &fd2); 302 if (error) { 303 fsetfd(fdp, NULL, fd1); 304 fdrop(rf); 305 /* pipeA has been closed by fdrop() */ 306 /* close pipeB here */ 307 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 308 return (error); 309 } 310 wf->f_type = DTYPE_PIPE; 311 wf->f_flag = FREAD | FWRITE; 312 wf->f_ops = &pipeops; 313 wf->f_data = (void *)((intptr_t)pipe | 1); 314 if (flags & O_NONBLOCK) 315 wf->f_flag |= O_NONBLOCK; 316 if (flags & O_CLOEXEC) 317 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE; 318 319 fds[1] = fd2; 320 321 /* 322 * Once activated the peer relationship remains valid until 323 * both sides are closed. 324 */ 325 fsetfd(fdp, rf, fd1); 326 fsetfd(fdp, wf, fd2); 327 fdrop(rf); 328 fdrop(wf); 329 330 return (0); 331 } 332 333 /* 334 * [re]allocates KVA for the pipe's circular buffer. The space is 335 * pageable. Called twice to setup full-duplex communications. 336 * 337 * NOTE: Independent vm_object's are used to improve performance. 338 * 339 * Returns 0 on success, ENOMEM on failure. 340 */ 341 static int 342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size) 343 { 344 struct vm_object *object; 345 caddr_t buffer; 346 vm_pindex_t npages; 347 int error; 348 349 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK; 350 if (size < 16384) 351 size = 16384; 352 if (size > 1024*1024) 353 size = 1024*1024; 354 355 npages = round_page(size) / PAGE_SIZE; 356 object = pb->object; 357 358 /* 359 * [re]create the object if necessary and reserve space for it 360 * in the kernel_map. The object and memory are pageable. On 361 * success, free the old resources before assigning the new 362 * ones. 363 */ 364 if (object == NULL || object->size != npages) { 365 object = vm_object_allocate(OBJT_DEFAULT, npages); 366 buffer = (caddr_t)vm_map_min(&kernel_map); 367 368 error = vm_map_find(&kernel_map, object, NULL, 369 0, (vm_offset_t *)&buffer, size, 370 PAGE_SIZE, TRUE, 371 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE, 372 VM_PROT_ALL, VM_PROT_ALL, 0); 373 374 if (error != KERN_SUCCESS) { 375 vm_object_deallocate(object); 376 return (ENOMEM); 377 } 378 pipe_free_kmem(pb); 379 pb->object = object; 380 pb->buffer = buffer; 381 pb->size = size; 382 } 383 pb->rindex = 0; 384 pb->windex = 0; 385 386 return (0); 387 } 388 389 /* 390 * Initialize and allocate VM and memory for pipe, pulling the pipe from 391 * our per-cpu cache if possible. 392 * 393 * Returns 0 on success, else an error code (typically ENOMEM). Caller 394 * must still deallocate the pipe on failure. 395 */ 396 static int 397 pipe_create(struct pipe **pipep) 398 { 399 globaldata_t gd = mycpu; 400 struct pipe *pipe; 401 int error; 402 403 if ((pipe = gd->gd_pipeq) != NULL) { 404 gd->gd_pipeq = pipe->next; 405 --gd->gd_pipeqcount; 406 pipe->next = NULL; 407 } else { 408 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO); 409 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2; 410 lwkt_token_init(&pipe->bufferA.rlock, "piper"); 411 lwkt_token_init(&pipe->bufferA.wlock, "pipew"); 412 lwkt_token_init(&pipe->bufferB.rlock, "piper"); 413 lwkt_token_init(&pipe->bufferB.wlock, "pipew"); 414 } 415 *pipep = pipe; 416 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) { 417 return (error); 418 } 419 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) { 420 return (error); 421 } 422 vfs_timestamp(&pipe->ctime); 423 pipe->bufferA.atime = pipe->ctime; 424 pipe->bufferA.mtime = pipe->ctime; 425 pipe->bufferB.atime = pipe->ctime; 426 pipe->bufferB.mtime = pipe->ctime; 427 pipe->open_count = 2; 428 429 return (0); 430 } 431 432 /* 433 * Read data from a pipe 434 */ 435 static int 436 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 437 { 438 struct pipebuf *rpb; 439 struct pipebuf *wpb; 440 struct pipe *pipe; 441 size_t nread = 0; 442 size_t size; /* total bytes available */ 443 size_t nsize; /* total bytes to read */ 444 size_t rindex; /* contiguous bytes available */ 445 int notify_writer; 446 int bigread; 447 int bigcount; 448 int error; 449 int nbio; 450 451 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 452 if ((intptr_t)fp->f_data & 1) { 453 rpb = &pipe->bufferB; 454 wpb = &pipe->bufferA; 455 } else { 456 rpb = &pipe->bufferA; 457 wpb = &pipe->bufferB; 458 } 459 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC); 460 461 if (uio->uio_resid == 0) 462 return(0); 463 464 /* 465 * Calculate nbio 466 */ 467 if (fflags & O_FBLOCKING) 468 nbio = 0; 469 else if (fflags & O_FNONBLOCKING) 470 nbio = 1; 471 else if (fp->f_flag & O_NONBLOCK) 472 nbio = 1; 473 else 474 nbio = 0; 475 476 /* 477 * 'quick' NBIO test before things get expensive. 478 */ 479 if (nbio && rpb->rindex == rpb->windex && 480 (rpb->state & PIPE_REOF) == 0) { 481 return EAGAIN; 482 } 483 484 /* 485 * Reads are serialized. Note however that buffer.buffer and 486 * buffer.size can change out from under us when the number 487 * of bytes in the buffer are zero due to the write-side doing a 488 * pipespace(). 489 */ 490 lwkt_gettoken(&rpb->rlock); 491 error = pipe_start_uio(&rpb->rip); 492 if (error) { 493 lwkt_reltoken(&rpb->rlock); 494 return (error); 495 } 496 notify_writer = 0; 497 498 bigread = (uio->uio_resid > 10 * 1024 * 1024); 499 bigcount = 10; 500 501 while (uio->uio_resid) { 502 /* 503 * Don't hog the cpu. 504 */ 505 if (bigread && --bigcount == 0) { 506 lwkt_user_yield(); 507 bigcount = 10; 508 if (CURSIG(curthread->td_lwp)) { 509 error = EINTR; 510 break; 511 } 512 } 513 514 /* 515 * lfence required to avoid read-reordering of buffer 516 * contents prior to validation of size. 517 */ 518 size = rpb->windex - rpb->rindex; 519 cpu_lfence(); 520 if (size) { 521 rindex = rpb->rindex & (rpb->size - 1); 522 nsize = size; 523 if (nsize > rpb->size - rindex) 524 nsize = rpb->size - rindex; 525 nsize = szmin(nsize, uio->uio_resid); 526 527 /* 528 * Limit how much we move in one go so we have a 529 * chance to kick the writer while data is still 530 * available in the pipe. This avoids getting into 531 * a ping-pong with the writer. 532 */ 533 if (nsize > (rpb->size >> 1)) 534 nsize = rpb->size >> 1; 535 536 error = uiomove(&rpb->buffer[rindex], nsize, uio); 537 if (error) 538 break; 539 rpb->rindex += nsize; 540 nread += nsize; 541 542 /* 543 * If the FIFO is still over half full just continue 544 * and do not try to notify the writer yet. If 545 * less than half full notify any waiting writer. 546 */ 547 if (size - nsize > (rpb->size >> 1)) { 548 notify_writer = 0; 549 } else { 550 notify_writer = 1; 551 pipesignal(rpb, PIPE_WANTW); 552 } 553 continue; 554 } 555 556 /* 557 * If the "write-side" was blocked we wake it up. This code 558 * is reached when the buffer is completely emptied. 559 */ 560 pipesignal(rpb, PIPE_WANTW); 561 562 /* 563 * Pick up our copy loop again if the writer sent data to 564 * us while we were messing around. 565 * 566 * On a SMP box poll up to pipe_delay nanoseconds for new 567 * data. Typically a value of 2000 to 4000 is sufficient 568 * to eradicate most IPIs/tsleeps/wakeups when a pipe 569 * is used for synchronous communications with small packets, 570 * and 8000 or so (8uS) will pipeline large buffer xfers 571 * between cpus over a pipe. 572 * 573 * For synchronous communications a hit means doing a 574 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 575 * where as miss requiring a tsleep/wakeup sequence 576 * will take 7uS or more. 577 */ 578 if (rpb->windex != rpb->rindex) 579 continue; 580 581 #ifdef _RDTSC_SUPPORTED_ 582 if (pipe_delay) { 583 int64_t tsc_target; 584 int good = 0; 585 586 tsc_target = tsc_get_target(pipe_delay); 587 while (tsc_test_target(tsc_target) == 0) { 588 cpu_lfence(); 589 if (rpb->windex != rpb->rindex) { 590 good = 1; 591 break; 592 } 593 cpu_pause(); 594 } 595 if (good) 596 continue; 597 } 598 #endif 599 600 /* 601 * Detect EOF condition, do not set error. 602 */ 603 if (rpb->state & PIPE_REOF) 604 break; 605 606 /* 607 * Break if some data was read, or if this was a non-blocking 608 * read. 609 */ 610 if (nread > 0) 611 break; 612 613 if (nbio) { 614 error = EAGAIN; 615 break; 616 } 617 618 /* 619 * Last chance, interlock with WANTR 620 */ 621 tsleep_interlock(rpb, PCATCH); 622 atomic_set_int(&rpb->state, PIPE_WANTR); 623 624 /* 625 * Retest bytes available after memory barrier above. 626 */ 627 size = rpb->windex - rpb->rindex; 628 if (size) 629 continue; 630 631 /* 632 * Retest EOF after memory barrier above. 633 */ 634 if (rpb->state & PIPE_REOF) 635 break; 636 637 /* 638 * Wait for more data or state change 639 */ 640 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0); 641 if (error) 642 break; 643 } 644 pipe_end_uio(&rpb->rip); 645 646 /* 647 * Uptime last access time 648 */ 649 if (error == 0 && nread && rpb->lticks != ticks) { 650 vfs_timestamp(&rpb->atime); 651 rpb->lticks = ticks; 652 } 653 654 /* 655 * If we drained the FIFO more then half way then handle 656 * write blocking hysteresis. 657 * 658 * Note that PIPE_WANTW cannot be set by the writer without 659 * it holding both rlock and wlock, so we can test it 660 * while holding just rlock. 661 */ 662 if (notify_writer) { 663 /* 664 * Synchronous blocking is done on the pipe involved 665 */ 666 pipesignal(rpb, PIPE_WANTW); 667 668 /* 669 * But we may also have to deal with a kqueue which is 670 * stored on the same pipe as its descriptor, so a 671 * EVFILT_WRITE event waiting for our side to drain will 672 * be on the other side. 673 */ 674 pipewakeup(wpb, 0); 675 } 676 /*size = rpb->windex - rpb->rindex;*/ 677 lwkt_reltoken(&rpb->rlock); 678 679 return (error); 680 } 681 682 static int 683 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 684 { 685 struct pipebuf *rpb; 686 struct pipebuf *wpb; 687 struct pipe *pipe; 688 size_t windex; 689 size_t space; 690 size_t wcount; 691 size_t orig_resid; 692 int bigwrite; 693 int bigcount; 694 int error; 695 int nbio; 696 697 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 698 if ((intptr_t)fp->f_data & 1) { 699 rpb = &pipe->bufferB; 700 wpb = &pipe->bufferA; 701 } else { 702 rpb = &pipe->bufferA; 703 wpb = &pipe->bufferB; 704 } 705 706 /* 707 * Calculate nbio 708 */ 709 if (fflags & O_FBLOCKING) 710 nbio = 0; 711 else if (fflags & O_FNONBLOCKING) 712 nbio = 1; 713 else if (fp->f_flag & O_NONBLOCK) 714 nbio = 1; 715 else 716 nbio = 0; 717 718 /* 719 * 'quick' NBIO test before things get expensive. 720 */ 721 if (nbio && wpb->size == (wpb->windex - wpb->rindex) && 722 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) { 723 return EAGAIN; 724 } 725 726 /* 727 * Writes go to the peer. The peer will always exist. 728 */ 729 lwkt_gettoken(&wpb->wlock); 730 if (wpb->state & PIPE_WEOF) { 731 lwkt_reltoken(&wpb->wlock); 732 return (EPIPE); 733 } 734 735 /* 736 * Degenerate case (EPIPE takes prec) 737 */ 738 if (uio->uio_resid == 0) { 739 lwkt_reltoken(&wpb->wlock); 740 return(0); 741 } 742 743 /* 744 * Writes are serialized (start_uio must be called with wlock) 745 */ 746 error = pipe_start_uio(&wpb->wip); 747 if (error) { 748 lwkt_reltoken(&wpb->wlock); 749 return (error); 750 } 751 752 orig_resid = uio->uio_resid; 753 wcount = 0; 754 755 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 756 bigcount = 10; 757 758 while (uio->uio_resid) { 759 if (wpb->state & PIPE_WEOF) { 760 error = EPIPE; 761 break; 762 } 763 764 /* 765 * Don't hog the cpu. 766 */ 767 if (bigwrite && --bigcount == 0) { 768 lwkt_user_yield(); 769 bigcount = 10; 770 if (CURSIG(curthread->td_lwp)) { 771 error = EINTR; 772 break; 773 } 774 } 775 776 windex = wpb->windex & (wpb->size - 1); 777 space = wpb->size - (wpb->windex - wpb->rindex); 778 779 /* 780 * Writes of size <= PIPE_BUF must be atomic. 781 */ 782 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 783 space = 0; 784 785 /* 786 * Write to fill, read size handles write hysteresis. Also 787 * additional restrictions can cause select-based non-blocking 788 * writes to spin. 789 */ 790 if (space > 0) { 791 size_t segsize; 792 793 /* 794 * We want to notify a potentially waiting reader 795 * before we exhaust the write buffer for SMP 796 * pipelining. Otherwise the write/read will begin 797 * to ping-pong. 798 */ 799 space = szmin(space, uio->uio_resid); 800 if (space > (wpb->size >> 1)) 801 space = (wpb->size >> 1); 802 803 /* 804 * First segment to transfer is minimum of 805 * transfer size and contiguous space in 806 * pipe buffer. If first segment to transfer 807 * is less than the transfer size, we've got 808 * a wraparound in the buffer. 809 */ 810 segsize = wpb->size - windex; 811 if (segsize > space) 812 segsize = space; 813 814 /* 815 * If this is the first loop and the reader is 816 * blocked, do a preemptive wakeup of the reader. 817 * 818 * On SMP the IPI latency plus the wlock interlock 819 * on the reader side is the fastest way to get the 820 * reader going. (The scheduler will hard loop on 821 * lock tokens). 822 */ 823 if (wcount == 0) 824 pipesignal(wpb, PIPE_WANTR); 825 826 /* 827 * Transfer segment, which may include a wrap-around. 828 * Update windex to account for both all in one go 829 * so the reader can read() the data atomically. 830 */ 831 error = uiomove(&wpb->buffer[windex], segsize, uio); 832 if (error == 0 && segsize < space) { 833 segsize = space - segsize; 834 error = uiomove(&wpb->buffer[0], segsize, uio); 835 } 836 if (error) 837 break; 838 839 /* 840 * Memory fence prior to windex updating (note: not 841 * needed so this is a NOP on Intel). 842 */ 843 cpu_sfence(); 844 wpb->windex += space; 845 846 /* 847 * Signal reader 848 */ 849 if (wcount != 0) 850 pipesignal(wpb, PIPE_WANTR); 851 wcount += space; 852 continue; 853 } 854 855 /* 856 * Wakeup any pending reader 857 */ 858 pipesignal(wpb, PIPE_WANTR); 859 860 /* 861 * don't block on non-blocking I/O 862 */ 863 if (nbio) { 864 error = EAGAIN; 865 break; 866 } 867 868 #ifdef _RDTSC_SUPPORTED_ 869 if (pipe_delay) { 870 int64_t tsc_target; 871 int good = 0; 872 873 tsc_target = tsc_get_target(pipe_delay); 874 while (tsc_test_target(tsc_target) == 0) { 875 cpu_lfence(); 876 space = wpb->size - (wpb->windex - wpb->rindex); 877 if ((space < uio->uio_resid) && 878 (orig_resid <= PIPE_BUF)) { 879 space = 0; 880 } 881 if (space) { 882 good = 1; 883 break; 884 } 885 cpu_pause(); 886 } 887 if (good) 888 continue; 889 } 890 #endif 891 892 /* 893 * Interlocked test. Atomic op enforces the memory barrier. 894 */ 895 tsleep_interlock(wpb, PCATCH); 896 atomic_set_int(&wpb->state, PIPE_WANTW); 897 898 /* 899 * Retest space available after memory barrier above. 900 * Writes of size <= PIPE_BUF must be atomic. 901 */ 902 space = wpb->size - (wpb->windex - wpb->rindex); 903 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 904 space = 0; 905 906 /* 907 * Retest EOF after memory barrier above. 908 */ 909 if (wpb->state & PIPE_WEOF) { 910 error = EPIPE; 911 break; 912 } 913 914 /* 915 * We have no more space and have something to offer, 916 * wake up select/poll/kq. 917 */ 918 if (space == 0) { 919 pipewakeup(wpb, 1); 920 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0); 921 } 922 923 /* 924 * Break out if we errored or the read side wants us to go 925 * away. 926 */ 927 if (error) 928 break; 929 if (wpb->state & PIPE_WEOF) { 930 error = EPIPE; 931 break; 932 } 933 } 934 pipe_end_uio(&wpb->wip); 935 936 /* 937 * If we have put any characters in the buffer, we wake up 938 * the reader. 939 * 940 * Both rlock and wlock are required to be able to modify pipe_state. 941 */ 942 if (wpb->windex != wpb->rindex) { 943 pipesignal(wpb, PIPE_WANTR); 944 pipewakeup(wpb, 1); 945 } 946 947 /* 948 * Don't return EPIPE if I/O was successful 949 */ 950 if ((wpb->rindex == wpb->windex) && 951 (uio->uio_resid == 0) && 952 (error == EPIPE)) { 953 error = 0; 954 } 955 956 if (error == 0 && wpb->lticks != ticks) { 957 vfs_timestamp(&wpb->mtime); 958 wpb->lticks = ticks; 959 } 960 961 /* 962 * We have something to offer, 963 * wake up select/poll/kq. 964 */ 965 /*space = wpb->windex - wpb->rindex;*/ 966 lwkt_reltoken(&wpb->wlock); 967 968 return (error); 969 } 970 971 /* 972 * we implement a very minimal set of ioctls for compatibility with sockets. 973 */ 974 static int 975 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, 976 struct ucred *cred, struct sysmsg *msg) 977 { 978 struct pipebuf *rpb; 979 struct pipe *pipe; 980 int error; 981 982 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 983 if ((intptr_t)fp->f_data & 1) { 984 rpb = &pipe->bufferB; 985 } else { 986 rpb = &pipe->bufferA; 987 } 988 989 lwkt_gettoken(&rpb->rlock); 990 lwkt_gettoken(&rpb->wlock); 991 992 switch (cmd) { 993 case FIOASYNC: 994 if (*(int *)data) { 995 atomic_set_int(&rpb->state, PIPE_ASYNC); 996 } else { 997 atomic_clear_int(&rpb->state, PIPE_ASYNC); 998 } 999 error = 0; 1000 break; 1001 case FIONREAD: 1002 *(int *)data = (int)(rpb->windex - rpb->rindex); 1003 error = 0; 1004 break; 1005 case FIOSETOWN: 1006 error = fsetown(*(int *)data, &rpb->sigio); 1007 break; 1008 case FIOGETOWN: 1009 *(int *)data = fgetown(&rpb->sigio); 1010 error = 0; 1011 break; 1012 case TIOCSPGRP: 1013 /* This is deprecated, FIOSETOWN should be used instead. */ 1014 error = fsetown(-(*(int *)data), &rpb->sigio); 1015 break; 1016 1017 case TIOCGPGRP: 1018 /* This is deprecated, FIOGETOWN should be used instead. */ 1019 *(int *)data = -fgetown(&rpb->sigio); 1020 error = 0; 1021 break; 1022 default: 1023 error = ENOTTY; 1024 break; 1025 } 1026 lwkt_reltoken(&rpb->wlock); 1027 lwkt_reltoken(&rpb->rlock); 1028 1029 return (error); 1030 } 1031 1032 /* 1033 * MPSAFE 1034 */ 1035 static int 1036 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1037 { 1038 struct pipebuf *rpb; 1039 struct pipe *pipe; 1040 1041 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1042 if ((intptr_t)fp->f_data & 1) { 1043 rpb = &pipe->bufferB; 1044 } else { 1045 rpb = &pipe->bufferA; 1046 } 1047 1048 bzero((caddr_t)ub, sizeof(*ub)); 1049 ub->st_mode = S_IFIFO; 1050 ub->st_blksize = rpb->size; 1051 ub->st_size = rpb->windex - rpb->rindex; 1052 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1053 ub->st_atimespec = rpb->atime; 1054 ub->st_mtimespec = rpb->mtime; 1055 ub->st_ctimespec = pipe->ctime; 1056 ub->st_uid = fp->f_cred->cr_uid; 1057 ub->st_gid = fp->f_cred->cr_gid; 1058 ub->st_ino = pipe->inum; 1059 /* 1060 * Left as 0: st_dev, st_nlink, st_rdev, 1061 * st_flags, st_gen. 1062 * XXX (st_dev, st_ino) should be unique. 1063 */ 1064 1065 return (0); 1066 } 1067 1068 static int 1069 pipe_close(struct file *fp) 1070 { 1071 struct pipebuf *rpb; 1072 struct pipebuf *wpb; 1073 struct pipe *pipe; 1074 1075 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1076 if ((intptr_t)fp->f_data & 1) { 1077 rpb = &pipe->bufferB; 1078 wpb = &pipe->bufferA; 1079 } else { 1080 rpb = &pipe->bufferA; 1081 wpb = &pipe->bufferB; 1082 } 1083 1084 fp->f_ops = &badfileops; 1085 fp->f_data = NULL; 1086 funsetown(&rpb->sigio); 1087 pipeclose(pipe, rpb, wpb); 1088 1089 return (0); 1090 } 1091 1092 /* 1093 * Shutdown one or both directions of a full-duplex pipe. 1094 */ 1095 static int 1096 pipe_shutdown(struct file *fp, int how) 1097 { 1098 struct pipebuf *rpb; 1099 struct pipebuf *wpb; 1100 struct pipe *pipe; 1101 int error = EPIPE; 1102 1103 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1104 if ((intptr_t)fp->f_data & 1) { 1105 rpb = &pipe->bufferB; 1106 wpb = &pipe->bufferA; 1107 } else { 1108 rpb = &pipe->bufferA; 1109 wpb = &pipe->bufferB; 1110 } 1111 1112 /* 1113 * We modify pipe_state on both pipes, which means we need 1114 * all four tokens! 1115 */ 1116 lwkt_gettoken(&rpb->rlock); 1117 lwkt_gettoken(&rpb->wlock); 1118 lwkt_gettoken(&wpb->rlock); 1119 lwkt_gettoken(&wpb->wlock); 1120 1121 switch(how) { 1122 case SHUT_RDWR: 1123 case SHUT_RD: 1124 /* 1125 * EOF on my reads and peer writes 1126 */ 1127 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF); 1128 if (rpb->state & PIPE_WANTR) { 1129 rpb->state &= ~PIPE_WANTR; 1130 wakeup(rpb); 1131 } 1132 if (rpb->state & PIPE_WANTW) { 1133 rpb->state &= ~PIPE_WANTW; 1134 wakeup(rpb); 1135 } 1136 error = 0; 1137 if (how == SHUT_RD) 1138 break; 1139 /* fall through */ 1140 case SHUT_WR: 1141 /* 1142 * EOF on peer reads and my writes 1143 */ 1144 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1145 if (wpb->state & PIPE_WANTR) { 1146 wpb->state &= ~PIPE_WANTR; 1147 wakeup(wpb); 1148 } 1149 if (wpb->state & PIPE_WANTW) { 1150 wpb->state &= ~PIPE_WANTW; 1151 wakeup(wpb); 1152 } 1153 error = 0; 1154 break; 1155 } 1156 pipewakeup(rpb, 1); 1157 pipewakeup(wpb, 1); 1158 1159 lwkt_reltoken(&wpb->wlock); 1160 lwkt_reltoken(&wpb->rlock); 1161 lwkt_reltoken(&rpb->wlock); 1162 lwkt_reltoken(&rpb->rlock); 1163 1164 return (error); 1165 } 1166 1167 /* 1168 * Destroy the pipe buffer. 1169 */ 1170 static void 1171 pipe_free_kmem(struct pipebuf *pb) 1172 { 1173 if (pb->buffer != NULL) { 1174 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size); 1175 pb->buffer = NULL; 1176 pb->object = NULL; 1177 } 1178 } 1179 1180 /* 1181 * Close one half of the pipe. We are closing the pipe for reading on rpb 1182 * and writing on wpb. This routine must be called twice with the pipebufs 1183 * reversed to close both directions. 1184 */ 1185 static void 1186 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb) 1187 { 1188 globaldata_t gd; 1189 1190 if (pipe == NULL) 1191 return; 1192 1193 /* 1194 * We need both the read and write tokens to modify pipe_state. 1195 */ 1196 lwkt_gettoken(&rpb->rlock); 1197 lwkt_gettoken(&rpb->wlock); 1198 1199 /* 1200 * Set our state, wakeup anyone waiting in select/poll/kq, and 1201 * wakeup anyone blocked on our pipe. No action if our side 1202 * is already closed. 1203 */ 1204 if (rpb->state & PIPE_CLOSED) { 1205 lwkt_reltoken(&rpb->wlock); 1206 lwkt_reltoken(&rpb->rlock); 1207 return; 1208 } 1209 1210 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF); 1211 pipewakeup(rpb, 1); 1212 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1213 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1214 wakeup(rpb); 1215 } 1216 lwkt_reltoken(&rpb->wlock); 1217 lwkt_reltoken(&rpb->rlock); 1218 1219 /* 1220 * Disconnect from peer. 1221 */ 1222 lwkt_gettoken(&wpb->rlock); 1223 lwkt_gettoken(&wpb->wlock); 1224 1225 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1226 pipewakeup(wpb, 1); 1227 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1228 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1229 wakeup(wpb); 1230 } 1231 if (SLIST_FIRST(&wpb->kq.ki_note)) 1232 KNOTE(&wpb->kq.ki_note, 0); 1233 lwkt_reltoken(&wpb->wlock); 1234 lwkt_reltoken(&wpb->rlock); 1235 1236 /* 1237 * Free resources once both sides are closed. We maintain a pcpu 1238 * cache to improve performance, so the actual tear-down case is 1239 * limited to bulk situations. 1240 * 1241 * However, the bulk tear-down case can cause intense contention 1242 * on the kernel_map when, e.g. hundreds to hundreds of thousands 1243 * of processes are killed at the same time. To deal with this we 1244 * use a pcpu mutex to maintain concurrency but also limit the 1245 * number of threads banging on the map and pmap. 1246 * 1247 * We use the mtx mechanism instead of the lockmgr mechanism because 1248 * the mtx mechanism utilizes a queued design which will not break 1249 * down in the face of thousands to hundreds of thousands of 1250 * processes trying to free pipes simultaneously. The lockmgr 1251 * mechanism will wind up waking them all up each time a lock 1252 * cycles. 1253 */ 1254 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) { 1255 gd = mycpu; 1256 if (gd->gd_pipeqcount >= pipe_maxcache) { 1257 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1258 pipe_free_kmem(rpb); 1259 pipe_free_kmem(wpb); 1260 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1261 kfree(pipe, M_PIPE); 1262 } else { 1263 rpb->state = 0; 1264 wpb->state = 0; 1265 pipe->next = gd->gd_pipeq; 1266 gd->gd_pipeq = pipe; 1267 ++gd->gd_pipeqcount; 1268 } 1269 } 1270 } 1271 1272 static int 1273 pipe_kqfilter(struct file *fp, struct knote *kn) 1274 { 1275 struct pipebuf *rpb; 1276 struct pipebuf *wpb; 1277 struct pipe *pipe; 1278 1279 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1280 if ((intptr_t)fp->f_data & 1) { 1281 rpb = &pipe->bufferB; 1282 wpb = &pipe->bufferA; 1283 } else { 1284 rpb = &pipe->bufferA; 1285 wpb = &pipe->bufferB; 1286 } 1287 1288 switch (kn->kn_filter) { 1289 case EVFILT_READ: 1290 kn->kn_fop = &pipe_rfiltops; 1291 break; 1292 case EVFILT_WRITE: 1293 kn->kn_fop = &pipe_wfiltops; 1294 if (wpb->state & PIPE_CLOSED) { 1295 /* other end of pipe has been closed */ 1296 return (EPIPE); 1297 } 1298 break; 1299 default: 1300 return (EOPNOTSUPP); 1301 } 1302 1303 if (rpb == &pipe->bufferA) 1304 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0); 1305 else 1306 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1); 1307 1308 knote_insert(&rpb->kq.ki_note, kn); 1309 1310 return (0); 1311 } 1312 1313 static void 1314 filt_pipedetach(struct knote *kn) 1315 { 1316 struct pipebuf *rpb; 1317 struct pipebuf *wpb; 1318 struct pipe *pipe; 1319 1320 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1); 1321 if ((intptr_t)kn->kn_hook & 1) { 1322 rpb = &pipe->bufferB; 1323 wpb = &pipe->bufferA; 1324 } else { 1325 rpb = &pipe->bufferA; 1326 wpb = &pipe->bufferB; 1327 } 1328 knote_remove(&rpb->kq.ki_note, kn); 1329 } 1330 1331 /*ARGSUSED*/ 1332 static int 1333 filt_piperead(struct knote *kn, long hint) 1334 { 1335 struct pipebuf *rpb; 1336 struct pipebuf *wpb; 1337 struct pipe *pipe; 1338 int ready = 0; 1339 1340 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1341 if ((intptr_t)kn->kn_fp->f_data & 1) { 1342 rpb = &pipe->bufferB; 1343 wpb = &pipe->bufferA; 1344 } else { 1345 rpb = &pipe->bufferA; 1346 wpb = &pipe->bufferB; 1347 } 1348 1349 /* 1350 * We shouldn't need the pipe locks because the knote itself is 1351 * locked via KN_PROCESSING. If we lose a race against the writer, 1352 * the writer will just issue a KNOTE() after us. 1353 */ 1354 #if 0 1355 lwkt_gettoken(&rpb->rlock); 1356 lwkt_gettoken(&rpb->wlock); 1357 #endif 1358 1359 kn->kn_data = rpb->windex - rpb->rindex; 1360 if (kn->kn_data < 0) 1361 kn->kn_data = 0; 1362 1363 if (rpb->state & PIPE_REOF) { 1364 /* 1365 * Only set NODATA if all data has been exhausted 1366 */ 1367 if (kn->kn_data == 0) 1368 kn->kn_flags |= EV_NODATA; 1369 kn->kn_flags |= EV_EOF; 1370 ready = 1; 1371 } 1372 1373 #if 0 1374 lwkt_reltoken(&rpb->wlock); 1375 lwkt_reltoken(&rpb->rlock); 1376 #endif 1377 1378 if (!ready) 1379 ready = kn->kn_data > 0; 1380 1381 return (ready); 1382 } 1383 1384 /*ARGSUSED*/ 1385 static int 1386 filt_pipewrite(struct knote *kn, long hint) 1387 { 1388 struct pipebuf *rpb; 1389 struct pipebuf *wpb; 1390 struct pipe *pipe; 1391 int ready = 0; 1392 1393 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1394 if ((intptr_t)kn->kn_fp->f_data & 1) { 1395 rpb = &pipe->bufferB; 1396 wpb = &pipe->bufferA; 1397 } else { 1398 rpb = &pipe->bufferA; 1399 wpb = &pipe->bufferB; 1400 } 1401 1402 kn->kn_data = 0; 1403 if (wpb->state & PIPE_CLOSED) { 1404 kn->kn_flags |= (EV_EOF | EV_NODATA); 1405 return (1); 1406 } 1407 1408 /* 1409 * We shouldn't need the pipe locks because the knote itself is 1410 * locked via KN_PROCESSING. If we lose a race against the reader, 1411 * the writer will just issue a KNOTE() after us. 1412 */ 1413 #if 0 1414 lwkt_gettoken(&wpb->rlock); 1415 lwkt_gettoken(&wpb->wlock); 1416 #endif 1417 1418 if (wpb->state & PIPE_WEOF) { 1419 kn->kn_flags |= (EV_EOF | EV_NODATA); 1420 ready = 1; 1421 } 1422 1423 if (!ready) { 1424 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex); 1425 if (kn->kn_data < 0) 1426 kn->kn_data = 0; 1427 } 1428 1429 #if 0 1430 lwkt_reltoken(&wpb->wlock); 1431 lwkt_reltoken(&wpb->rlock); 1432 #endif 1433 1434 if (!ready) 1435 ready = kn->kn_data >= PIPE_BUF; 1436 1437 return (ready); 1438 } 1439