1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved. 5 * 6 * This code is derived from software contributed to The DragonFly Project 7 * by Matthew Dillon <dillon@backplane.com> 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice immediately at the beginning of the file, without modification, 14 * this list of conditions, and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 3. Absolutely no warranty of function or purpose is made by the author 19 * John S. Dyson. 20 * 4. Modifications may be freely made to this file if the above conditions 21 * are met. 22 */ 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/kernel.h> 33 #include <sys/proc.h> 34 #include <sys/fcntl.h> 35 #include <sys/file.h> 36 #include <sys/filedesc.h> 37 #include <sys/filio.h> 38 #include <sys/ttycom.h> 39 #include <sys/stat.h> 40 #include <sys/signalvar.h> 41 #include <sys/sysproto.h> 42 #include <sys/pipe.h> 43 #include <sys/vnode.h> 44 #include <sys/uio.h> 45 #include <sys/event.h> 46 #include <sys/globaldata.h> 47 #include <sys/module.h> 48 #include <sys/malloc.h> 49 #include <sys/sysctl.h> 50 #include <sys/socket.h> 51 #include <sys/kern_syscall.h> 52 #include <sys/lock.h> 53 #include <sys/mutex.h> 54 55 #include <vm/vm.h> 56 #include <vm/vm_param.h> 57 #include <vm/vm_object.h> 58 #include <vm/vm_kern.h> 59 #include <vm/vm_extern.h> 60 #include <vm/pmap.h> 61 #include <vm/vm_map.h> 62 #include <vm/vm_page.h> 63 #include <vm/vm_zone.h> 64 65 #include <sys/file2.h> 66 #include <sys/signal2.h> 67 #include <sys/mutex2.h> 68 69 #include <machine/cpufunc.h> 70 71 struct pipegdlock { 72 struct mtx mtx; 73 } __cachealign; 74 75 /* 76 * interfaces to the outside world 77 */ 78 static int pipe_read (struct file *fp, struct uio *uio, 79 struct ucred *cred, int flags); 80 static int pipe_write (struct file *fp, struct uio *uio, 81 struct ucred *cred, int flags); 82 static int pipe_close (struct file *fp); 83 static int pipe_shutdown (struct file *fp, int how); 84 static int pipe_kqfilter (struct file *fp, struct knote *kn); 85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, 87 struct ucred *cred, struct sysmsg *msg); 88 89 __read_mostly static struct fileops pipeops = { 90 .fo_read = pipe_read, 91 .fo_write = pipe_write, 92 .fo_ioctl = pipe_ioctl, 93 .fo_kqfilter = pipe_kqfilter, 94 .fo_stat = pipe_stat, 95 .fo_close = pipe_close, 96 .fo_shutdown = pipe_shutdown 97 }; 98 99 static void filt_pipedetach(struct knote *kn); 100 static int filt_piperead(struct knote *kn, long hint); 101 static int filt_pipewrite(struct knote *kn, long hint); 102 103 __read_mostly static struct filterops pipe_rfiltops = 104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead }; 105 __read_mostly static struct filterops pipe_wfiltops = 106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite }; 107 108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 109 110 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 111 112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE; 113 __read_mostly static struct pipegdlock *pipe_gdlocks; 114 115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 118 119 /* 120 * The pipe buffer size can be changed at any time. Only new pipe()s 121 * are affected. Note that due to cpu cache effects, you do not want 122 * to make this value too large. 123 */ 124 __read_mostly static int pipe_size = 32768; 125 SYSCTL_INT(_kern_pipe, OID_AUTO, size, 126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)"); 127 128 /* 129 * Reader/writer delay loop. When the reader exhausts the pipe buffer 130 * or the write completely fills the pipe buffer and would otherwise sleep, 131 * it first busy-loops for a few microseconds waiting for data or buffer 132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes 133 * and also helps when the user program uses a large data buffer in its 134 * UIOs. 135 * 136 * This defaults to 4uS. 137 */ 138 #ifdef _RDTSC_SUPPORTED_ 139 __read_mostly static int pipe_delay = 4000; /* 4uS default */ 140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 142 #endif 143 144 /* 145 * Auto-size pipe cache to reduce kmem allocations and frees. 146 */ 147 static 148 void 149 pipeinit(void *dummy) 150 { 151 size_t mbytes = kmem_lim_size(); 152 int n; 153 154 if (pipe_maxcache == PIPEQ_MAX_CACHE) { 155 if (mbytes >= 7 * 1024) 156 pipe_maxcache *= 2; 157 if (mbytes >= 15 * 1024) 158 pipe_maxcache *= 2; 159 } 160 161 /* 162 * Detune the pcpu caching a bit on systems with an insane number 163 * of cpu threads to reduce memory waste. 164 */ 165 if (ncpus > 64) { 166 pipe_maxcache = pipe_maxcache * 64 / ncpus; 167 if (pipe_maxcache < PIPEQ_MAX_CACHE) 168 pipe_maxcache = PIPEQ_MAX_CACHE; 169 } 170 171 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus, 172 M_PIPE, M_WAITOK | M_ZERO); 173 for (n = 0; n < ncpus; ++n) 174 mtx_init(&pipe_gdlocks[n].mtx, "pipekm"); 175 } 176 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL); 177 178 static void pipeclose (struct pipe *pipe, 179 struct pipebuf *pbr, struct pipebuf *pbw); 180 static void pipe_free_kmem (struct pipebuf *buf); 181 static int pipe_create (struct pipe **pipep); 182 183 /* 184 * Test and clear the specified flag, wakeup(pb) if it was set. 185 * This function must also act as a memory barrier. 186 */ 187 static __inline void 188 pipesignal(struct pipebuf *pb, uint32_t flags) 189 { 190 uint32_t oflags; 191 uint32_t nflags; 192 193 for (;;) { 194 oflags = pb->state; 195 cpu_ccfence(); 196 nflags = oflags & ~flags; 197 if (atomic_cmpset_int(&pb->state, oflags, nflags)) 198 break; 199 } 200 if (oflags & flags) 201 wakeup(pb); 202 } 203 204 /* 205 * 206 */ 207 static __inline void 208 pipewakeup(struct pipebuf *pb, int dosigio) 209 { 210 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) { 211 lwkt_gettoken(&sigio_token); 212 pgsigio(pb->sigio, SIGIO, 0); 213 lwkt_reltoken(&sigio_token); 214 } 215 KNOTE(&pb->kq.ki_note, 0); 216 } 217 218 /* 219 * These routines are called before and after a UIO. The UIO 220 * may block, causing our held tokens to be lost temporarily. 221 * 222 * We use these routines to serialize reads against other reads 223 * and writes against other writes. 224 * 225 * The appropriate token is held on entry so *ipp does not race. 226 */ 227 static __inline int 228 pipe_start_uio(int *ipp) 229 { 230 int error; 231 232 while (*ipp) { 233 *ipp = -1; 234 error = tsleep(ipp, PCATCH, "pipexx", 0); 235 if (error) 236 return (error); 237 } 238 *ipp = 1; 239 return (0); 240 } 241 242 static __inline void 243 pipe_end_uio(int *ipp) 244 { 245 if (*ipp < 0) { 246 *ipp = 0; 247 wakeup(ipp); 248 } else { 249 KKASSERT(*ipp > 0); 250 *ipp = 0; 251 } 252 } 253 254 /* 255 * The pipe system call for the DTYPE_PIPE type of pipes 256 * 257 * pipe_args(int dummy) 258 * 259 * MPSAFE 260 */ 261 int 262 sys_pipe(struct pipe_args *uap) 263 { 264 return kern_pipe(uap->sysmsg_fds, 0); 265 } 266 267 int 268 sys_pipe2(struct pipe2_args *uap) 269 { 270 return kern_pipe(uap->sysmsg_fds, uap->flags); 271 } 272 273 int 274 kern_pipe(long *fds, int flags) 275 { 276 struct thread *td = curthread; 277 struct filedesc *fdp = td->td_proc->p_fd; 278 struct file *rf, *wf; 279 struct pipe *pipe; 280 int fd1, fd2, error; 281 282 pipe = NULL; 283 if (pipe_create(&pipe)) { 284 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 285 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 286 return (ENFILE); 287 } 288 289 error = falloc(td->td_lwp, &rf, &fd1); 290 if (error) { 291 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 292 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 293 return (error); 294 } 295 fds[0] = fd1; 296 297 /* 298 * Warning: once we've gotten past allocation of the fd for the 299 * read-side, we can only drop the read side via fdrop() in order 300 * to avoid races against processes which manage to dup() the read 301 * side while we are blocked trying to allocate the write side. 302 */ 303 rf->f_type = DTYPE_PIPE; 304 rf->f_flag = FREAD | FWRITE; 305 rf->f_ops = &pipeops; 306 rf->f_data = (void *)((intptr_t)pipe | 0); 307 if (flags & O_NONBLOCK) 308 rf->f_flag |= O_NONBLOCK; 309 if (flags & O_CLOEXEC) 310 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE; 311 312 error = falloc(td->td_lwp, &wf, &fd2); 313 if (error) { 314 fsetfd(fdp, NULL, fd1); 315 fdrop(rf); 316 /* pipeA has been closed by fdrop() */ 317 /* close pipeB here */ 318 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 319 return (error); 320 } 321 wf->f_type = DTYPE_PIPE; 322 wf->f_flag = FREAD | FWRITE; 323 wf->f_ops = &pipeops; 324 wf->f_data = (void *)((intptr_t)pipe | 1); 325 if (flags & O_NONBLOCK) 326 wf->f_flag |= O_NONBLOCK; 327 if (flags & O_CLOEXEC) 328 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE; 329 330 fds[1] = fd2; 331 332 /* 333 * Once activated the peer relationship remains valid until 334 * both sides are closed. 335 */ 336 fsetfd(fdp, rf, fd1); 337 fsetfd(fdp, wf, fd2); 338 fdrop(rf); 339 fdrop(wf); 340 341 return (0); 342 } 343 344 /* 345 * [re]allocates KVA for the pipe's circular buffer. The space is 346 * pageable. Called twice to setup full-duplex communications. 347 * 348 * NOTE: Independent vm_object's are used to improve performance. 349 * 350 * Returns 0 on success, ENOMEM on failure. 351 */ 352 static int 353 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size) 354 { 355 struct vm_object *object; 356 caddr_t buffer; 357 vm_pindex_t npages; 358 int error; 359 360 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK; 361 if (size < 16384) 362 size = 16384; 363 if (size > 1024*1024) 364 size = 1024*1024; 365 366 npages = round_page(size) / PAGE_SIZE; 367 object = pb->object; 368 369 /* 370 * [re]create the object if necessary and reserve space for it 371 * in the kernel_map. The object and memory are pageable. On 372 * success, free the old resources before assigning the new 373 * ones. 374 */ 375 if (object == NULL || object->size != npages) { 376 object = vm_object_allocate(OBJT_DEFAULT, npages); 377 buffer = (caddr_t)vm_map_min(&kernel_map); 378 379 error = vm_map_find(&kernel_map, object, NULL, 380 0, (vm_offset_t *)&buffer, size, 381 PAGE_SIZE, TRUE, 382 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE, 383 VM_PROT_ALL, VM_PROT_ALL, 0); 384 385 if (error != KERN_SUCCESS) { 386 vm_object_deallocate(object); 387 return (ENOMEM); 388 } 389 pipe_free_kmem(pb); 390 pb->object = object; 391 pb->buffer = buffer; 392 pb->size = size; 393 } 394 pb->rindex = 0; 395 pb->windex = 0; 396 397 return (0); 398 } 399 400 /* 401 * Initialize and allocate VM and memory for pipe, pulling the pipe from 402 * our per-cpu cache if possible. 403 * 404 * Returns 0 on success, else an error code (typically ENOMEM). Caller 405 * must still deallocate the pipe on failure. 406 */ 407 static int 408 pipe_create(struct pipe **pipep) 409 { 410 globaldata_t gd = mycpu; 411 struct pipe *pipe; 412 int error; 413 414 if ((pipe = gd->gd_pipeq) != NULL) { 415 gd->gd_pipeq = pipe->next; 416 --gd->gd_pipeqcount; 417 pipe->next = NULL; 418 } else { 419 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO); 420 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2; 421 lwkt_token_init(&pipe->bufferA.rlock, "piper"); 422 lwkt_token_init(&pipe->bufferA.wlock, "pipew"); 423 lwkt_token_init(&pipe->bufferB.rlock, "piper"); 424 lwkt_token_init(&pipe->bufferB.wlock, "pipew"); 425 } 426 *pipep = pipe; 427 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) { 428 return (error); 429 } 430 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) { 431 return (error); 432 } 433 vfs_timestamp(&pipe->ctime); 434 pipe->bufferA.atime = pipe->ctime; 435 pipe->bufferA.mtime = pipe->ctime; 436 pipe->bufferB.atime = pipe->ctime; 437 pipe->bufferB.mtime = pipe->ctime; 438 pipe->open_count = 2; 439 440 return (0); 441 } 442 443 /* 444 * Read data from a pipe 445 */ 446 static int 447 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 448 { 449 struct pipebuf *rpb; 450 struct pipebuf *wpb; 451 struct pipe *pipe; 452 size_t nread = 0; 453 size_t size; /* total bytes available */ 454 size_t nsize; /* total bytes to read */ 455 size_t rindex; /* contiguous bytes available */ 456 int notify_writer; 457 int bigread; 458 int bigcount; 459 int error; 460 int nbio; 461 462 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 463 if ((intptr_t)fp->f_data & 1) { 464 rpb = &pipe->bufferB; 465 wpb = &pipe->bufferA; 466 } else { 467 rpb = &pipe->bufferA; 468 wpb = &pipe->bufferB; 469 } 470 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC); 471 472 if (uio->uio_resid == 0) 473 return(0); 474 475 /* 476 * Calculate nbio 477 */ 478 if (fflags & O_FBLOCKING) 479 nbio = 0; 480 else if (fflags & O_FNONBLOCKING) 481 nbio = 1; 482 else if (fp->f_flag & O_NONBLOCK) 483 nbio = 1; 484 else 485 nbio = 0; 486 487 /* 488 * 'quick' NBIO test before things get expensive. 489 */ 490 if (nbio && rpb->rindex == rpb->windex && 491 (rpb->state & PIPE_REOF) == 0) { 492 return EAGAIN; 493 } 494 495 /* 496 * Reads are serialized. Note however that buffer.buffer and 497 * buffer.size can change out from under us when the number 498 * of bytes in the buffer are zero due to the write-side doing a 499 * pipespace(). 500 */ 501 lwkt_gettoken(&rpb->rlock); 502 error = pipe_start_uio(&rpb->rip); 503 if (error) { 504 lwkt_reltoken(&rpb->rlock); 505 return (error); 506 } 507 notify_writer = 0; 508 509 bigread = (uio->uio_resid > 10 * 1024 * 1024); 510 bigcount = 10; 511 512 while (uio->uio_resid) { 513 /* 514 * Don't hog the cpu. 515 */ 516 if (bigread && --bigcount == 0) { 517 lwkt_user_yield(); 518 bigcount = 10; 519 if (CURSIG(curthread->td_lwp)) { 520 error = EINTR; 521 break; 522 } 523 } 524 525 /* 526 * lfence required to avoid read-reordering of buffer 527 * contents prior to validation of size. 528 */ 529 size = rpb->windex - rpb->rindex; 530 cpu_lfence(); 531 if (size) { 532 rindex = rpb->rindex & (rpb->size - 1); 533 nsize = size; 534 if (nsize > rpb->size - rindex) 535 nsize = rpb->size - rindex; 536 nsize = szmin(nsize, uio->uio_resid); 537 538 /* 539 * Limit how much we move in one go so we have a 540 * chance to kick the writer while data is still 541 * available in the pipe. This avoids getting into 542 * a ping-pong with the writer. 543 */ 544 if (nsize > (rpb->size >> 1)) 545 nsize = rpb->size >> 1; 546 547 error = uiomove(&rpb->buffer[rindex], nsize, uio); 548 if (error) 549 break; 550 rpb->rindex += nsize; 551 nread += nsize; 552 553 /* 554 * If the FIFO is still over half full just continue 555 * and do not try to notify the writer yet. If 556 * less than half full notify any waiting writer. 557 */ 558 if (size - nsize > (rpb->size >> 1)) { 559 notify_writer = 0; 560 } else { 561 notify_writer = 1; 562 pipesignal(rpb, PIPE_WANTW); 563 } 564 continue; 565 } 566 567 /* 568 * If the "write-side" was blocked we wake it up. This code 569 * is reached when the buffer is completely emptied. 570 */ 571 pipesignal(rpb, PIPE_WANTW); 572 573 /* 574 * Pick up our copy loop again if the writer sent data to 575 * us while we were messing around. 576 * 577 * On a SMP box poll up to pipe_delay nanoseconds for new 578 * data. Typically a value of 2000 to 4000 is sufficient 579 * to eradicate most IPIs/tsleeps/wakeups when a pipe 580 * is used for synchronous communications with small packets, 581 * and 8000 or so (8uS) will pipeline large buffer xfers 582 * between cpus over a pipe. 583 * 584 * For synchronous communications a hit means doing a 585 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 586 * where as miss requiring a tsleep/wakeup sequence 587 * will take 7uS or more. 588 */ 589 if (rpb->windex != rpb->rindex) 590 continue; 591 592 #ifdef _RDTSC_SUPPORTED_ 593 if (pipe_delay) { 594 int64_t tsc_target; 595 int good = 0; 596 597 tsc_target = tsc_get_target(pipe_delay); 598 while (tsc_test_target(tsc_target) == 0) { 599 cpu_lfence(); 600 if (rpb->windex != rpb->rindex) { 601 good = 1; 602 break; 603 } 604 cpu_pause(); 605 } 606 if (good) 607 continue; 608 } 609 #endif 610 611 /* 612 * Detect EOF condition, do not set error. 613 */ 614 if (rpb->state & PIPE_REOF) 615 break; 616 617 /* 618 * Break if some data was read, or if this was a non-blocking 619 * read. 620 */ 621 if (nread > 0) 622 break; 623 624 if (nbio) { 625 error = EAGAIN; 626 break; 627 } 628 629 /* 630 * Last chance, interlock with WANTR 631 */ 632 tsleep_interlock(rpb, PCATCH); 633 atomic_set_int(&rpb->state, PIPE_WANTR); 634 635 /* 636 * Retest bytes available after memory barrier above. 637 */ 638 size = rpb->windex - rpb->rindex; 639 if (size) 640 continue; 641 642 /* 643 * Retest EOF after memory barrier above. 644 */ 645 if (rpb->state & PIPE_REOF) 646 break; 647 648 /* 649 * Wait for more data or state change 650 */ 651 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0); 652 if (error) 653 break; 654 } 655 pipe_end_uio(&rpb->rip); 656 657 /* 658 * Uptime last access time 659 */ 660 if (error == 0 && nread && rpb->lticks != ticks) { 661 vfs_timestamp(&rpb->atime); 662 rpb->lticks = ticks; 663 } 664 665 /* 666 * If we drained the FIFO more then half way then handle 667 * write blocking hysteresis. 668 * 669 * Note that PIPE_WANTW cannot be set by the writer without 670 * it holding both rlock and wlock, so we can test it 671 * while holding just rlock. 672 */ 673 if (notify_writer) { 674 /* 675 * Synchronous blocking is done on the pipe involved 676 */ 677 pipesignal(rpb, PIPE_WANTW); 678 679 /* 680 * But we may also have to deal with a kqueue which is 681 * stored on the same pipe as its descriptor, so a 682 * EVFILT_WRITE event waiting for our side to drain will 683 * be on the other side. 684 */ 685 pipewakeup(wpb, 0); 686 } 687 /*size = rpb->windex - rpb->rindex;*/ 688 lwkt_reltoken(&rpb->rlock); 689 690 return (error); 691 } 692 693 static int 694 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 695 { 696 struct pipebuf *rpb; 697 struct pipebuf *wpb; 698 struct pipe *pipe; 699 size_t windex; 700 size_t space; 701 size_t wcount; 702 size_t orig_resid; 703 int bigwrite; 704 int bigcount; 705 int error; 706 int nbio; 707 708 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 709 if ((intptr_t)fp->f_data & 1) { 710 rpb = &pipe->bufferB; 711 wpb = &pipe->bufferA; 712 } else { 713 rpb = &pipe->bufferA; 714 wpb = &pipe->bufferB; 715 } 716 717 /* 718 * Calculate nbio 719 */ 720 if (fflags & O_FBLOCKING) 721 nbio = 0; 722 else if (fflags & O_FNONBLOCKING) 723 nbio = 1; 724 else if (fp->f_flag & O_NONBLOCK) 725 nbio = 1; 726 else 727 nbio = 0; 728 729 /* 730 * 'quick' NBIO test before things get expensive. 731 */ 732 if (nbio && wpb->size == (wpb->windex - wpb->rindex) && 733 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) { 734 return EAGAIN; 735 } 736 737 /* 738 * Writes go to the peer. The peer will always exist. 739 */ 740 lwkt_gettoken(&wpb->wlock); 741 if (wpb->state & PIPE_WEOF) { 742 lwkt_reltoken(&wpb->wlock); 743 return (EPIPE); 744 } 745 746 /* 747 * Degenerate case (EPIPE takes prec) 748 */ 749 if (uio->uio_resid == 0) { 750 lwkt_reltoken(&wpb->wlock); 751 return(0); 752 } 753 754 /* 755 * Writes are serialized (start_uio must be called with wlock) 756 */ 757 error = pipe_start_uio(&wpb->wip); 758 if (error) { 759 lwkt_reltoken(&wpb->wlock); 760 return (error); 761 } 762 763 orig_resid = uio->uio_resid; 764 wcount = 0; 765 766 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 767 bigcount = 10; 768 769 while (uio->uio_resid) { 770 if (wpb->state & PIPE_WEOF) { 771 error = EPIPE; 772 break; 773 } 774 775 /* 776 * Don't hog the cpu. 777 */ 778 if (bigwrite && --bigcount == 0) { 779 lwkt_user_yield(); 780 bigcount = 10; 781 if (CURSIG(curthread->td_lwp)) { 782 error = EINTR; 783 break; 784 } 785 } 786 787 windex = wpb->windex & (wpb->size - 1); 788 space = wpb->size - (wpb->windex - wpb->rindex); 789 790 /* 791 * Writes of size <= PIPE_BUF must be atomic. 792 */ 793 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 794 space = 0; 795 796 /* 797 * Write to fill, read size handles write hysteresis. Also 798 * additional restrictions can cause select-based non-blocking 799 * writes to spin. 800 */ 801 if (space > 0) { 802 size_t segsize; 803 804 /* 805 * We want to notify a potentially waiting reader 806 * before we exhaust the write buffer for SMP 807 * pipelining. Otherwise the write/read will begin 808 * to ping-pong. 809 */ 810 space = szmin(space, uio->uio_resid); 811 if (space > (wpb->size >> 1)) 812 space = (wpb->size >> 1); 813 814 /* 815 * First segment to transfer is minimum of 816 * transfer size and contiguous space in 817 * pipe buffer. If first segment to transfer 818 * is less than the transfer size, we've got 819 * a wraparound in the buffer. 820 */ 821 segsize = wpb->size - windex; 822 if (segsize > space) 823 segsize = space; 824 825 /* 826 * If this is the first loop and the reader is 827 * blocked, do a preemptive wakeup of the reader. 828 * 829 * On SMP the IPI latency plus the wlock interlock 830 * on the reader side is the fastest way to get the 831 * reader going. (The scheduler will hard loop on 832 * lock tokens). 833 */ 834 if (wcount == 0) 835 pipesignal(wpb, PIPE_WANTR); 836 837 /* 838 * Transfer segment, which may include a wrap-around. 839 * Update windex to account for both all in one go 840 * so the reader can read() the data atomically. 841 */ 842 error = uiomove(&wpb->buffer[windex], segsize, uio); 843 if (error == 0 && segsize < space) { 844 segsize = space - segsize; 845 error = uiomove(&wpb->buffer[0], segsize, uio); 846 } 847 if (error) 848 break; 849 850 /* 851 * Memory fence prior to windex updating (note: not 852 * needed so this is a NOP on Intel). 853 */ 854 cpu_sfence(); 855 wpb->windex += space; 856 857 /* 858 * Signal reader 859 */ 860 if (wcount != 0) 861 pipesignal(wpb, PIPE_WANTR); 862 wcount += space; 863 continue; 864 } 865 866 /* 867 * Wakeup any pending reader 868 */ 869 pipesignal(wpb, PIPE_WANTR); 870 871 /* 872 * don't block on non-blocking I/O 873 */ 874 if (nbio) { 875 error = EAGAIN; 876 break; 877 } 878 879 #ifdef _RDTSC_SUPPORTED_ 880 if (pipe_delay) { 881 int64_t tsc_target; 882 int good = 0; 883 884 tsc_target = tsc_get_target(pipe_delay); 885 while (tsc_test_target(tsc_target) == 0) { 886 cpu_lfence(); 887 space = wpb->size - (wpb->windex - wpb->rindex); 888 if ((space < uio->uio_resid) && 889 (orig_resid <= PIPE_BUF)) { 890 space = 0; 891 } 892 if (space) { 893 good = 1; 894 break; 895 } 896 cpu_pause(); 897 } 898 if (good) 899 continue; 900 } 901 #endif 902 903 /* 904 * Interlocked test. Atomic op enforces the memory barrier. 905 */ 906 tsleep_interlock(wpb, PCATCH); 907 atomic_set_int(&wpb->state, PIPE_WANTW); 908 909 /* 910 * Retest space available after memory barrier above. 911 * Writes of size <= PIPE_BUF must be atomic. 912 */ 913 space = wpb->size - (wpb->windex - wpb->rindex); 914 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 915 space = 0; 916 917 /* 918 * Retest EOF after memory barrier above. 919 */ 920 if (wpb->state & PIPE_WEOF) { 921 error = EPIPE; 922 break; 923 } 924 925 /* 926 * We have no more space and have something to offer, 927 * wake up select/poll/kq. 928 */ 929 if (space == 0) { 930 pipewakeup(wpb, 1); 931 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0); 932 } 933 934 /* 935 * Break out if we errored or the read side wants us to go 936 * away. 937 */ 938 if (error) 939 break; 940 if (wpb->state & PIPE_WEOF) { 941 error = EPIPE; 942 break; 943 } 944 } 945 pipe_end_uio(&wpb->wip); 946 947 /* 948 * If we have put any characters in the buffer, we wake up 949 * the reader. 950 * 951 * Both rlock and wlock are required to be able to modify pipe_state. 952 */ 953 if (wpb->windex != wpb->rindex) { 954 pipesignal(wpb, PIPE_WANTR); 955 pipewakeup(wpb, 1); 956 } 957 958 /* 959 * Don't return EPIPE if I/O was successful 960 */ 961 if ((wpb->rindex == wpb->windex) && 962 (uio->uio_resid == 0) && 963 (error == EPIPE)) { 964 error = 0; 965 } 966 967 if (error == 0 && wpb->lticks != ticks) { 968 vfs_timestamp(&wpb->mtime); 969 wpb->lticks = ticks; 970 } 971 972 /* 973 * We have something to offer, 974 * wake up select/poll/kq. 975 */ 976 /*space = wpb->windex - wpb->rindex;*/ 977 lwkt_reltoken(&wpb->wlock); 978 979 return (error); 980 } 981 982 /* 983 * we implement a very minimal set of ioctls for compatibility with sockets. 984 */ 985 static int 986 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, 987 struct ucred *cred, struct sysmsg *msg) 988 { 989 struct pipebuf *rpb; 990 struct pipe *pipe; 991 int error; 992 993 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 994 if ((intptr_t)fp->f_data & 1) { 995 rpb = &pipe->bufferB; 996 } else { 997 rpb = &pipe->bufferA; 998 } 999 1000 lwkt_gettoken(&rpb->rlock); 1001 lwkt_gettoken(&rpb->wlock); 1002 1003 switch (cmd) { 1004 case FIOASYNC: 1005 if (*(int *)data) { 1006 atomic_set_int(&rpb->state, PIPE_ASYNC); 1007 } else { 1008 atomic_clear_int(&rpb->state, PIPE_ASYNC); 1009 } 1010 error = 0; 1011 break; 1012 case FIONREAD: 1013 *(int *)data = (int)(rpb->windex - rpb->rindex); 1014 error = 0; 1015 break; 1016 case FIOSETOWN: 1017 error = fsetown(*(int *)data, &rpb->sigio); 1018 break; 1019 case FIOGETOWN: 1020 *(int *)data = fgetown(&rpb->sigio); 1021 error = 0; 1022 break; 1023 case TIOCSPGRP: 1024 /* This is deprecated, FIOSETOWN should be used instead. */ 1025 error = fsetown(-(*(int *)data), &rpb->sigio); 1026 break; 1027 1028 case TIOCGPGRP: 1029 /* This is deprecated, FIOGETOWN should be used instead. */ 1030 *(int *)data = -fgetown(&rpb->sigio); 1031 error = 0; 1032 break; 1033 default: 1034 error = ENOTTY; 1035 break; 1036 } 1037 lwkt_reltoken(&rpb->wlock); 1038 lwkt_reltoken(&rpb->rlock); 1039 1040 return (error); 1041 } 1042 1043 /* 1044 * MPSAFE 1045 */ 1046 static int 1047 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1048 { 1049 struct pipebuf *rpb; 1050 struct pipe *pipe; 1051 1052 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1053 if ((intptr_t)fp->f_data & 1) { 1054 rpb = &pipe->bufferB; 1055 } else { 1056 rpb = &pipe->bufferA; 1057 } 1058 1059 bzero((caddr_t)ub, sizeof(*ub)); 1060 ub->st_mode = S_IFIFO; 1061 ub->st_blksize = rpb->size; 1062 ub->st_size = rpb->windex - rpb->rindex; 1063 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1064 ub->st_atimespec = rpb->atime; 1065 ub->st_mtimespec = rpb->mtime; 1066 ub->st_ctimespec = pipe->ctime; 1067 ub->st_uid = fp->f_cred->cr_uid; 1068 ub->st_gid = fp->f_cred->cr_gid; 1069 ub->st_ino = pipe->inum; 1070 /* 1071 * Left as 0: st_dev, st_nlink, st_rdev, 1072 * st_flags, st_gen. 1073 * XXX (st_dev, st_ino) should be unique. 1074 */ 1075 1076 return (0); 1077 } 1078 1079 static int 1080 pipe_close(struct file *fp) 1081 { 1082 struct pipebuf *rpb; 1083 struct pipebuf *wpb; 1084 struct pipe *pipe; 1085 1086 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1087 if ((intptr_t)fp->f_data & 1) { 1088 rpb = &pipe->bufferB; 1089 wpb = &pipe->bufferA; 1090 } else { 1091 rpb = &pipe->bufferA; 1092 wpb = &pipe->bufferB; 1093 } 1094 1095 fp->f_ops = &badfileops; 1096 fp->f_data = NULL; 1097 funsetown(&rpb->sigio); 1098 pipeclose(pipe, rpb, wpb); 1099 1100 return (0); 1101 } 1102 1103 /* 1104 * Shutdown one or both directions of a full-duplex pipe. 1105 */ 1106 static int 1107 pipe_shutdown(struct file *fp, int how) 1108 { 1109 struct pipebuf *rpb; 1110 struct pipebuf *wpb; 1111 struct pipe *pipe; 1112 int error = EPIPE; 1113 1114 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1115 if ((intptr_t)fp->f_data & 1) { 1116 rpb = &pipe->bufferB; 1117 wpb = &pipe->bufferA; 1118 } else { 1119 rpb = &pipe->bufferA; 1120 wpb = &pipe->bufferB; 1121 } 1122 1123 /* 1124 * We modify pipe_state on both pipes, which means we need 1125 * all four tokens! 1126 */ 1127 lwkt_gettoken(&rpb->rlock); 1128 lwkt_gettoken(&rpb->wlock); 1129 lwkt_gettoken(&wpb->rlock); 1130 lwkt_gettoken(&wpb->wlock); 1131 1132 switch(how) { 1133 case SHUT_RDWR: 1134 case SHUT_RD: 1135 /* 1136 * EOF on my reads and peer writes 1137 */ 1138 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF); 1139 if (rpb->state & PIPE_WANTR) { 1140 rpb->state &= ~PIPE_WANTR; 1141 wakeup(rpb); 1142 } 1143 if (rpb->state & PIPE_WANTW) { 1144 rpb->state &= ~PIPE_WANTW; 1145 wakeup(rpb); 1146 } 1147 error = 0; 1148 if (how == SHUT_RD) 1149 break; 1150 /* fall through */ 1151 case SHUT_WR: 1152 /* 1153 * EOF on peer reads and my writes 1154 */ 1155 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1156 if (wpb->state & PIPE_WANTR) { 1157 wpb->state &= ~PIPE_WANTR; 1158 wakeup(wpb); 1159 } 1160 if (wpb->state & PIPE_WANTW) { 1161 wpb->state &= ~PIPE_WANTW; 1162 wakeup(wpb); 1163 } 1164 error = 0; 1165 break; 1166 } 1167 pipewakeup(rpb, 1); 1168 pipewakeup(wpb, 1); 1169 1170 lwkt_reltoken(&wpb->wlock); 1171 lwkt_reltoken(&wpb->rlock); 1172 lwkt_reltoken(&rpb->wlock); 1173 lwkt_reltoken(&rpb->rlock); 1174 1175 return (error); 1176 } 1177 1178 /* 1179 * Destroy the pipe buffer. 1180 */ 1181 static void 1182 pipe_free_kmem(struct pipebuf *pb) 1183 { 1184 if (pb->buffer != NULL) { 1185 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size); 1186 pb->buffer = NULL; 1187 pb->object = NULL; 1188 } 1189 } 1190 1191 /* 1192 * Close one half of the pipe. We are closing the pipe for reading on rpb 1193 * and writing on wpb. This routine must be called twice with the pipebufs 1194 * reversed to close both directions. 1195 */ 1196 static void 1197 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb) 1198 { 1199 globaldata_t gd; 1200 1201 if (pipe == NULL) 1202 return; 1203 1204 /* 1205 * We need both the read and write tokens to modify pipe_state. 1206 */ 1207 lwkt_gettoken(&rpb->rlock); 1208 lwkt_gettoken(&rpb->wlock); 1209 1210 /* 1211 * Set our state, wakeup anyone waiting in select/poll/kq, and 1212 * wakeup anyone blocked on our pipe. No action if our side 1213 * is already closed. 1214 */ 1215 if (rpb->state & PIPE_CLOSED) { 1216 lwkt_reltoken(&rpb->wlock); 1217 lwkt_reltoken(&rpb->rlock); 1218 return; 1219 } 1220 1221 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF); 1222 pipewakeup(rpb, 1); 1223 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1224 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1225 wakeup(rpb); 1226 } 1227 lwkt_reltoken(&rpb->wlock); 1228 lwkt_reltoken(&rpb->rlock); 1229 1230 /* 1231 * Disconnect from peer. 1232 */ 1233 lwkt_gettoken(&wpb->rlock); 1234 lwkt_gettoken(&wpb->wlock); 1235 1236 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1237 pipewakeup(wpb, 1); 1238 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1239 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1240 wakeup(wpb); 1241 } 1242 if (SLIST_FIRST(&wpb->kq.ki_note)) 1243 KNOTE(&wpb->kq.ki_note, 0); 1244 lwkt_reltoken(&wpb->wlock); 1245 lwkt_reltoken(&wpb->rlock); 1246 1247 /* 1248 * Free resources once both sides are closed. We maintain a pcpu 1249 * cache to improve performance, so the actual tear-down case is 1250 * limited to bulk situations. 1251 * 1252 * However, the bulk tear-down case can cause intense contention 1253 * on the kernel_map when, e.g. hundreds to hundreds of thousands 1254 * of processes are killed at the same time. To deal with this we 1255 * use a pcpu mutex to maintain concurrency but also limit the 1256 * number of threads banging on the map and pmap. 1257 * 1258 * We use the mtx mechanism instead of the lockmgr mechanism because 1259 * the mtx mechanism utilizes a queued design which will not break 1260 * down in the face of thousands to hundreds of thousands of 1261 * processes trying to free pipes simultaneously. The lockmgr 1262 * mechanism will wind up waking them all up each time a lock 1263 * cycles. 1264 */ 1265 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) { 1266 gd = mycpu; 1267 if (gd->gd_pipeqcount >= pipe_maxcache) { 1268 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1269 pipe_free_kmem(rpb); 1270 pipe_free_kmem(wpb); 1271 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1272 kfree(pipe, M_PIPE); 1273 } else { 1274 rpb->state = 0; 1275 wpb->state = 0; 1276 pipe->next = gd->gd_pipeq; 1277 gd->gd_pipeq = pipe; 1278 ++gd->gd_pipeqcount; 1279 } 1280 } 1281 } 1282 1283 static int 1284 pipe_kqfilter(struct file *fp, struct knote *kn) 1285 { 1286 struct pipebuf *rpb; 1287 struct pipebuf *wpb; 1288 struct pipe *pipe; 1289 1290 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1291 if ((intptr_t)fp->f_data & 1) { 1292 rpb = &pipe->bufferB; 1293 wpb = &pipe->bufferA; 1294 } else { 1295 rpb = &pipe->bufferA; 1296 wpb = &pipe->bufferB; 1297 } 1298 1299 switch (kn->kn_filter) { 1300 case EVFILT_READ: 1301 kn->kn_fop = &pipe_rfiltops; 1302 break; 1303 case EVFILT_WRITE: 1304 kn->kn_fop = &pipe_wfiltops; 1305 if (wpb->state & PIPE_CLOSED) { 1306 /* other end of pipe has been closed */ 1307 return (EPIPE); 1308 } 1309 break; 1310 default: 1311 return (EOPNOTSUPP); 1312 } 1313 1314 if (rpb == &pipe->bufferA) 1315 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0); 1316 else 1317 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1); 1318 1319 knote_insert(&rpb->kq.ki_note, kn); 1320 1321 return (0); 1322 } 1323 1324 static void 1325 filt_pipedetach(struct knote *kn) 1326 { 1327 struct pipebuf *rpb; 1328 struct pipebuf *wpb; 1329 struct pipe *pipe; 1330 1331 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1); 1332 if ((intptr_t)kn->kn_hook & 1) { 1333 rpb = &pipe->bufferB; 1334 wpb = &pipe->bufferA; 1335 } else { 1336 rpb = &pipe->bufferA; 1337 wpb = &pipe->bufferB; 1338 } 1339 knote_remove(&rpb->kq.ki_note, kn); 1340 } 1341 1342 /*ARGSUSED*/ 1343 static int 1344 filt_piperead(struct knote *kn, long hint) 1345 { 1346 struct pipebuf *rpb; 1347 struct pipebuf *wpb; 1348 struct pipe *pipe; 1349 int ready = 0; 1350 1351 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1352 if ((intptr_t)kn->kn_fp->f_data & 1) { 1353 rpb = &pipe->bufferB; 1354 wpb = &pipe->bufferA; 1355 } else { 1356 rpb = &pipe->bufferA; 1357 wpb = &pipe->bufferB; 1358 } 1359 1360 /* 1361 * We shouldn't need the pipe locks because the knote itself is 1362 * locked via KN_PROCESSING. If we lose a race against the writer, 1363 * the writer will just issue a KNOTE() after us. 1364 */ 1365 #if 0 1366 lwkt_gettoken(&rpb->rlock); 1367 lwkt_gettoken(&rpb->wlock); 1368 #endif 1369 1370 kn->kn_data = rpb->windex - rpb->rindex; 1371 if (kn->kn_data < 0) 1372 kn->kn_data = 0; 1373 1374 if (rpb->state & PIPE_REOF) { 1375 /* 1376 * Only set NODATA if all data has been exhausted 1377 */ 1378 if (kn->kn_data == 0) 1379 kn->kn_flags |= EV_NODATA; 1380 kn->kn_flags |= EV_EOF; 1381 ready = 1; 1382 } 1383 1384 #if 0 1385 lwkt_reltoken(&rpb->wlock); 1386 lwkt_reltoken(&rpb->rlock); 1387 #endif 1388 1389 if (!ready) 1390 ready = kn->kn_data > 0; 1391 1392 return (ready); 1393 } 1394 1395 /*ARGSUSED*/ 1396 static int 1397 filt_pipewrite(struct knote *kn, long hint) 1398 { 1399 struct pipebuf *rpb; 1400 struct pipebuf *wpb; 1401 struct pipe *pipe; 1402 int ready = 0; 1403 1404 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1405 if ((intptr_t)kn->kn_fp->f_data & 1) { 1406 rpb = &pipe->bufferB; 1407 wpb = &pipe->bufferA; 1408 } else { 1409 rpb = &pipe->bufferA; 1410 wpb = &pipe->bufferB; 1411 } 1412 1413 kn->kn_data = 0; 1414 if (wpb->state & PIPE_CLOSED) { 1415 kn->kn_flags |= (EV_EOF | EV_NODATA); 1416 return (1); 1417 } 1418 1419 /* 1420 * We shouldn't need the pipe locks because the knote itself is 1421 * locked via KN_PROCESSING. If we lose a race against the reader, 1422 * the writer will just issue a KNOTE() after us. 1423 */ 1424 #if 0 1425 lwkt_gettoken(&wpb->rlock); 1426 lwkt_gettoken(&wpb->wlock); 1427 #endif 1428 1429 if (wpb->state & PIPE_WEOF) { 1430 kn->kn_flags |= (EV_EOF | EV_NODATA); 1431 ready = 1; 1432 } 1433 1434 if (!ready) { 1435 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex); 1436 if (kn->kn_data < 0) 1437 kn->kn_data = 0; 1438 } 1439 1440 #if 0 1441 lwkt_reltoken(&wpb->wlock); 1442 lwkt_reltoken(&wpb->rlock); 1443 #endif 1444 1445 if (!ready) 1446 ready = kn->kn_data >= PIPE_BUF; 1447 1448 return (ready); 1449 } 1450