xref: /dragonfly/sys/kern/sys_pipe.c (revision 145205d1)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68 
69 #include <machine/cpufunc.h>
70 
71 struct pipegdlock {
72 	struct mtx	mtx;
73 } __cachealign;
74 
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 		struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 		struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 		struct ucred *cred, struct sysmsg *msg);
88 
89 static struct fileops pipeops = {
90 	.fo_read = pipe_read,
91 	.fo_write = pipe_write,
92 	.fo_ioctl = pipe_ioctl,
93 	.fo_kqfilter = pipe_kqfilter,
94 	.fo_stat = pipe_stat,
95 	.fo_close = pipe_close,
96 	.fo_shutdown = pipe_shutdown
97 };
98 
99 static void	filt_pipedetach(struct knote *kn);
100 static int	filt_piperead(struct knote *kn, long hint);
101 static int	filt_pipewrite(struct knote *kn, long hint);
102 
103 static struct filterops pipe_rfiltops =
104 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107 
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109 
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111 
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
114 
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118 
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127 
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 static int pipe_delay = 4000;	/* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143 
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151 	size_t mbytes = kmem_lim_size();
152 	int n;
153 
154 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 		if (mbytes >= 7 * 1024)
156 			pipe_maxcache *= 2;
157 		if (mbytes >= 15 * 1024)
158 			pipe_maxcache *= 2;
159 	}
160 	pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161 			     M_PIPE, M_WAITOK | M_ZERO);
162 	for (n = 0; n < ncpus; ++n)
163 		mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
164 }
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
166 
167 static void pipeclose (struct pipe *pipe,
168 		struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
171 
172 /*
173  * Test and clear the specified flag, wakeup(pb) if it was set.
174  * This function must also act as a memory barrier.
175  */
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
178 {
179 	uint32_t oflags;
180 	uint32_t nflags;
181 
182 	for (;;) {
183 		oflags = pb->state;
184 		cpu_ccfence();
185 		nflags = oflags & ~flags;
186 		if (atomic_cmpset_int(&pb->state, oflags, nflags))
187 			break;
188 	}
189 	if (oflags & flags)
190 		wakeup(pb);
191 }
192 
193 /*
194  *
195  */
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
198 {
199 	if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200 		lwkt_gettoken(&sigio_token);
201 		pgsigio(pb->sigio, SIGIO, 0);
202 		lwkt_reltoken(&sigio_token);
203 	}
204 	KNOTE(&pb->kq.ki_note, 0);
205 }
206 
207 /*
208  * These routines are called before and after a UIO.  The UIO
209  * may block, causing our held tokens to be lost temporarily.
210  *
211  * We use these routines to serialize reads against other reads
212  * and writes against other writes.
213  *
214  * The appropriate token is held on entry so *ipp does not race.
215  */
216 static __inline int
217 pipe_start_uio(int *ipp)
218 {
219 	int error;
220 
221 	while (*ipp) {
222 		*ipp = -1;
223 		error = tsleep(ipp, PCATCH, "pipexx", 0);
224 		if (error)
225 			return (error);
226 	}
227 	*ipp = 1;
228 	return (0);
229 }
230 
231 static __inline void
232 pipe_end_uio(int *ipp)
233 {
234 	if (*ipp < 0) {
235 		*ipp = 0;
236 		wakeup(ipp);
237 	} else {
238 		KKASSERT(*ipp > 0);
239 		*ipp = 0;
240 	}
241 }
242 
243 /*
244  * The pipe system call for the DTYPE_PIPE type of pipes
245  *
246  * pipe_args(int dummy)
247  *
248  * MPSAFE
249  */
250 int
251 sys_pipe(struct pipe_args *uap)
252 {
253 	return kern_pipe(uap->sysmsg_fds, 0);
254 }
255 
256 int
257 sys_pipe2(struct pipe2_args *uap)
258 {
259 	return kern_pipe(uap->sysmsg_fds, uap->flags);
260 }
261 
262 int
263 kern_pipe(long *fds, int flags)
264 {
265 	struct thread *td = curthread;
266 	struct filedesc *fdp = td->td_proc->p_fd;
267 	struct file *rf, *wf;
268 	struct pipe *pipe;
269 	int fd1, fd2, error;
270 
271 	pipe = NULL;
272 	if (pipe_create(&pipe)) {
273 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275 		return (ENFILE);
276 	}
277 
278 	error = falloc(td->td_lwp, &rf, &fd1);
279 	if (error) {
280 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282 		return (error);
283 	}
284 	fds[0] = fd1;
285 
286 	/*
287 	 * Warning: once we've gotten past allocation of the fd for the
288 	 * read-side, we can only drop the read side via fdrop() in order
289 	 * to avoid races against processes which manage to dup() the read
290 	 * side while we are blocked trying to allocate the write side.
291 	 */
292 	rf->f_type = DTYPE_PIPE;
293 	rf->f_flag = FREAD | FWRITE;
294 	rf->f_ops = &pipeops;
295 	rf->f_data = (void *)((intptr_t)pipe | 0);
296 	if (flags & O_NONBLOCK)
297 		rf->f_flag |= O_NONBLOCK;
298 	if (flags & O_CLOEXEC)
299 		fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
300 
301 	error = falloc(td->td_lwp, &wf, &fd2);
302 	if (error) {
303 		fsetfd(fdp, NULL, fd1);
304 		fdrop(rf);
305 		/* pipeA has been closed by fdrop() */
306 		/* close pipeB here */
307 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308 		return (error);
309 	}
310 	wf->f_type = DTYPE_PIPE;
311 	wf->f_flag = FREAD | FWRITE;
312 	wf->f_ops = &pipeops;
313 	wf->f_data = (void *)((intptr_t)pipe | 1);
314 	if (flags & O_NONBLOCK)
315 		wf->f_flag |= O_NONBLOCK;
316 	if (flags & O_CLOEXEC)
317 		fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
318 
319 	fds[1] = fd2;
320 
321 	/*
322 	 * Once activated the peer relationship remains valid until
323 	 * both sides are closed.
324 	 */
325 	fsetfd(fdp, rf, fd1);
326 	fsetfd(fdp, wf, fd2);
327 	fdrop(rf);
328 	fdrop(wf);
329 
330 	return (0);
331 }
332 
333 /*
334  * [re]allocates KVA for the pipe's circular buffer.  The space is
335  * pageable.  Called twice to setup full-duplex communications.
336  *
337  * NOTE: Independent vm_object's are used to improve performance.
338  *
339  * Returns 0 on success, ENOMEM on failure.
340  */
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
343 {
344 	struct vm_object *object;
345 	caddr_t buffer;
346 	vm_pindex_t npages;
347 	int error;
348 
349 	size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350 	if (size < 16384)
351 		size = 16384;
352 	if (size > 1024*1024)
353 		size = 1024*1024;
354 
355 	npages = round_page(size) / PAGE_SIZE;
356 	object = pb->object;
357 
358 	/*
359 	 * [re]create the object if necessary and reserve space for it
360 	 * in the kernel_map.  The object and memory are pageable.  On
361 	 * success, free the old resources before assigning the new
362 	 * ones.
363 	 */
364 	if (object == NULL || object->size != npages) {
365 		object = vm_object_allocate(OBJT_DEFAULT, npages);
366 		buffer = (caddr_t)vm_map_min(&kernel_map);
367 
368 		error = vm_map_find(&kernel_map, object, NULL,
369 				    0, (vm_offset_t *)&buffer, size,
370 				    PAGE_SIZE, TRUE,
371 				    VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372 				    VM_PROT_ALL, VM_PROT_ALL, 0);
373 
374 		if (error != KERN_SUCCESS) {
375 			vm_object_deallocate(object);
376 			return (ENOMEM);
377 		}
378 		pipe_free_kmem(pb);
379 		pb->object = object;
380 		pb->buffer = buffer;
381 		pb->size = size;
382 	}
383 	pb->rindex = 0;
384 	pb->windex = 0;
385 
386 	return (0);
387 }
388 
389 /*
390  * Initialize and allocate VM and memory for pipe, pulling the pipe from
391  * our per-cpu cache if possible.
392  *
393  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
394  * must still deallocate the pipe on failure.
395  */
396 static int
397 pipe_create(struct pipe **pipep)
398 {
399 	globaldata_t gd = mycpu;
400 	struct pipe *pipe;
401 	int error;
402 
403 	if ((pipe = gd->gd_pipeq) != NULL) {
404 		gd->gd_pipeq = pipe->next;
405 		--gd->gd_pipeqcount;
406 		pipe->next = NULL;
407 	} else {
408 		pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409 		lwkt_token_init(&pipe->bufferA.rlock, "piper");
410 		lwkt_token_init(&pipe->bufferA.wlock, "pipew");
411 		lwkt_token_init(&pipe->bufferB.rlock, "piper");
412 		lwkt_token_init(&pipe->bufferB.wlock, "pipew");
413 	}
414 	*pipep = pipe;
415 	if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
416 		return (error);
417 	}
418 	if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
419 		return (error);
420 	}
421 	vfs_timestamp(&pipe->ctime);
422 	pipe->bufferA.atime = pipe->ctime;
423 	pipe->bufferA.mtime = pipe->ctime;
424 	pipe->bufferB.atime = pipe->ctime;
425 	pipe->bufferB.mtime = pipe->ctime;
426 	pipe->open_count = 2;
427 
428 	return (0);
429 }
430 
431 /*
432  * Read data from a pipe
433  */
434 static int
435 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
436 {
437 	struct pipebuf *rpb;
438 	struct pipebuf *wpb;
439 	struct pipe *pipe;
440 	size_t nread = 0;
441 	size_t size;	/* total bytes available */
442 	size_t nsize;	/* total bytes to read */
443 	size_t rindex;	/* contiguous bytes available */
444 	int notify_writer;
445 	int bigread;
446 	int bigcount;
447 	int error;
448 	int nbio;
449 
450 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
451 	if ((intptr_t)fp->f_data & 1) {
452 		rpb = &pipe->bufferB;
453 		wpb = &pipe->bufferA;
454 	} else {
455 		rpb = &pipe->bufferA;
456 		wpb = &pipe->bufferB;
457 	}
458 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
459 
460 	if (uio->uio_resid == 0)
461 		return(0);
462 
463 	/*
464 	 * Calculate nbio
465 	 */
466 	if (fflags & O_FBLOCKING)
467 		nbio = 0;
468 	else if (fflags & O_FNONBLOCKING)
469 		nbio = 1;
470 	else if (fp->f_flag & O_NONBLOCK)
471 		nbio = 1;
472 	else
473 		nbio = 0;
474 
475 	/*
476 	 * 'quick' NBIO test before things get expensive.
477 	 */
478 	if (nbio && rpb->rindex == rpb->windex &&
479 	    (rpb->state & PIPE_REOF) == 0) {
480 		return EAGAIN;
481 	}
482 
483 	/*
484 	 * Reads are serialized.  Note however that buffer.buffer and
485 	 * buffer.size can change out from under us when the number
486 	 * of bytes in the buffer are zero due to the write-side doing a
487 	 * pipespace().
488 	 */
489 	lwkt_gettoken(&rpb->rlock);
490 	error = pipe_start_uio(&rpb->rip);
491 	if (error) {
492 		lwkt_reltoken(&rpb->rlock);
493 		return (error);
494 	}
495 	notify_writer = 0;
496 
497 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
498 	bigcount = 10;
499 
500 	while (uio->uio_resid) {
501 		/*
502 		 * Don't hog the cpu.
503 		 */
504 		if (bigread && --bigcount == 0) {
505 			lwkt_user_yield();
506 			bigcount = 10;
507 			if (CURSIG(curthread->td_lwp)) {
508 				error = EINTR;
509 				break;
510 			}
511 		}
512 
513 		/*
514 		 * lfence required to avoid read-reordering of buffer
515 		 * contents prior to validation of size.
516 		 */
517 		size = rpb->windex - rpb->rindex;
518 		cpu_lfence();
519 		if (size) {
520 			rindex = rpb->rindex & (rpb->size - 1);
521 			nsize = size;
522 			if (nsize > rpb->size - rindex)
523 				nsize = rpb->size - rindex;
524 			nsize = szmin(nsize, uio->uio_resid);
525 
526 			/*
527 			 * Limit how much we move in one go so we have a
528 			 * chance to kick the writer while data is still
529 			 * available in the pipe.  This avoids getting into
530 			 * a ping-pong with the writer.
531 			 */
532 			if (nsize > (rpb->size >> 1))
533 				nsize = rpb->size >> 1;
534 
535 			error = uiomove(&rpb->buffer[rindex], nsize, uio);
536 			if (error)
537 				break;
538 			rpb->rindex += nsize;
539 			nread += nsize;
540 
541 			/*
542 			 * If the FIFO is still over half full just continue
543 			 * and do not try to notify the writer yet.  If
544 			 * less than half full notify any waiting writer.
545 			 */
546 			if (size - nsize > (rpb->size >> 1)) {
547 				notify_writer = 0;
548 			} else {
549 				notify_writer = 1;
550 				pipesignal(rpb, PIPE_WANTW);
551 			}
552 			continue;
553 		}
554 
555 		/*
556 		 * If the "write-side" was blocked we wake it up.  This code
557 		 * is reached when the buffer is completely emptied.
558 		 */
559 		pipesignal(rpb, PIPE_WANTW);
560 
561 		/*
562 		 * Pick up our copy loop again if the writer sent data to
563 		 * us while we were messing around.
564 		 *
565 		 * On a SMP box poll up to pipe_delay nanoseconds for new
566 		 * data.  Typically a value of 2000 to 4000 is sufficient
567 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
568 		 * is used for synchronous communications with small packets,
569 		 * and 8000 or so (8uS) will pipeline large buffer xfers
570 		 * between cpus over a pipe.
571 		 *
572 		 * For synchronous communications a hit means doing a
573 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
574 		 * where as miss requiring a tsleep/wakeup sequence
575 		 * will take 7uS or more.
576 		 */
577 		if (rpb->windex != rpb->rindex)
578 			continue;
579 
580 #ifdef _RDTSC_SUPPORTED_
581 		if (pipe_delay) {
582 			int64_t tsc_target;
583 			int good = 0;
584 
585 			tsc_target = tsc_get_target(pipe_delay);
586 			while (tsc_test_target(tsc_target) == 0) {
587 				cpu_lfence();
588 				if (rpb->windex != rpb->rindex) {
589 					good = 1;
590 					break;
591 				}
592 				cpu_pause();
593 			}
594 			if (good)
595 				continue;
596 		}
597 #endif
598 
599 		/*
600 		 * Detect EOF condition, do not set error.
601 		 */
602 		if (rpb->state & PIPE_REOF)
603 			break;
604 
605 		/*
606 		 * Break if some data was read, or if this was a non-blocking
607 		 * read.
608 		 */
609 		if (nread > 0)
610 			break;
611 
612 		if (nbio) {
613 			error = EAGAIN;
614 			break;
615 		}
616 
617 		/*
618 		 * Last chance, interlock with WANTR
619 		 */
620 		tsleep_interlock(rpb, PCATCH);
621 		atomic_set_int(&rpb->state, PIPE_WANTR);
622 
623 		/*
624 		 * Retest bytes available after memory barrier above.
625 		 */
626 		size = rpb->windex - rpb->rindex;
627 		if (size)
628 			continue;
629 
630 		/*
631 		 * Retest EOF after memory barrier above.
632 		 */
633 		if (rpb->state & PIPE_REOF)
634 			break;
635 
636 		/*
637 		 * Wait for more data or state change
638 		 */
639 		error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
640 		if (error)
641 			break;
642 	}
643 	pipe_end_uio(&rpb->rip);
644 
645 	/*
646 	 * Uptime last access time
647 	 */
648 	if (error == 0 && nread)
649 		vfs_timestamp(&rpb->atime);
650 
651 	/*
652 	 * If we drained the FIFO more then half way then handle
653 	 * write blocking hysteresis.
654 	 *
655 	 * Note that PIPE_WANTW cannot be set by the writer without
656 	 * it holding both rlock and wlock, so we can test it
657 	 * while holding just rlock.
658 	 */
659 	if (notify_writer) {
660 		/*
661 		 * Synchronous blocking is done on the pipe involved
662 		 */
663 		pipesignal(rpb, PIPE_WANTW);
664 
665 		/*
666 		 * But we may also have to deal with a kqueue which is
667 		 * stored on the same pipe as its descriptor, so a
668 		 * EVFILT_WRITE event waiting for our side to drain will
669 		 * be on the other side.
670 		 */
671 		pipewakeup(wpb, 0);
672 	}
673 	/*size = rpb->windex - rpb->rindex;*/
674 	lwkt_reltoken(&rpb->rlock);
675 
676 	return (error);
677 }
678 
679 static int
680 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
681 {
682 	struct pipebuf *rpb;
683 	struct pipebuf *wpb;
684 	struct pipe *pipe;
685 	size_t windex;
686 	size_t space;
687 	size_t wcount;
688 	size_t orig_resid;
689 	int bigwrite;
690 	int bigcount;
691 	int error;
692 	int nbio;
693 
694 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
695 	if ((intptr_t)fp->f_data & 1) {
696 		rpb = &pipe->bufferB;
697 		wpb = &pipe->bufferA;
698 	} else {
699 		rpb = &pipe->bufferA;
700 		wpb = &pipe->bufferB;
701 	}
702 
703 	/*
704 	 * Calculate nbio
705 	 */
706 	if (fflags & O_FBLOCKING)
707 		nbio = 0;
708 	else if (fflags & O_FNONBLOCKING)
709 		nbio = 1;
710 	else if (fp->f_flag & O_NONBLOCK)
711 		nbio = 1;
712 	else
713 		nbio = 0;
714 
715 	/*
716 	 * 'quick' NBIO test before things get expensive.
717 	 */
718 	if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
719 	    uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
720 		return EAGAIN;
721 	}
722 
723 	/*
724 	 * Writes go to the peer.  The peer will always exist.
725 	 */
726 	lwkt_gettoken(&wpb->wlock);
727 	if (wpb->state & PIPE_WEOF) {
728 		lwkt_reltoken(&wpb->wlock);
729 		return (EPIPE);
730 	}
731 
732 	/*
733 	 * Degenerate case (EPIPE takes prec)
734 	 */
735 	if (uio->uio_resid == 0) {
736 		lwkt_reltoken(&wpb->wlock);
737 		return(0);
738 	}
739 
740 	/*
741 	 * Writes are serialized (start_uio must be called with wlock)
742 	 */
743 	error = pipe_start_uio(&wpb->wip);
744 	if (error) {
745 		lwkt_reltoken(&wpb->wlock);
746 		return (error);
747 	}
748 
749 	orig_resid = uio->uio_resid;
750 	wcount = 0;
751 
752 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
753 	bigcount = 10;
754 
755 	while (uio->uio_resid) {
756 		if (wpb->state & PIPE_WEOF) {
757 			error = EPIPE;
758 			break;
759 		}
760 
761 		/*
762 		 * Don't hog the cpu.
763 		 */
764 		if (bigwrite && --bigcount == 0) {
765 			lwkt_user_yield();
766 			bigcount = 10;
767 			if (CURSIG(curthread->td_lwp)) {
768 				error = EINTR;
769 				break;
770 			}
771 		}
772 
773 		windex = wpb->windex & (wpb->size - 1);
774 		space = wpb->size - (wpb->windex - wpb->rindex);
775 
776 		/*
777 		 * Writes of size <= PIPE_BUF must be atomic.
778 		 */
779 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
780 			space = 0;
781 
782 		/*
783 		 * Write to fill, read size handles write hysteresis.  Also
784 		 * additional restrictions can cause select-based non-blocking
785 		 * writes to spin.
786 		 */
787 		if (space > 0) {
788 			size_t segsize;
789 
790 			/*
791 			 * We want to notify a potentially waiting reader
792 			 * before we exhaust the write buffer for SMP
793 			 * pipelining.  Otherwise the write/read will begin
794 			 * to ping-pong.
795 			 */
796 			space = szmin(space, uio->uio_resid);
797 			if (space > (wpb->size >> 1))
798 				space = (wpb->size >> 1);
799 
800 			/*
801 			 * First segment to transfer is minimum of
802 			 * transfer size and contiguous space in
803 			 * pipe buffer.  If first segment to transfer
804 			 * is less than the transfer size, we've got
805 			 * a wraparound in the buffer.
806 			 */
807 			segsize = wpb->size - windex;
808 			if (segsize > space)
809 				segsize = space;
810 
811 			/*
812 			 * If this is the first loop and the reader is
813 			 * blocked, do a preemptive wakeup of the reader.
814 			 *
815 			 * On SMP the IPI latency plus the wlock interlock
816 			 * on the reader side is the fastest way to get the
817 			 * reader going.  (The scheduler will hard loop on
818 			 * lock tokens).
819 			 */
820 			if (wcount == 0)
821 				pipesignal(wpb, PIPE_WANTR);
822 
823 			/*
824 			 * Transfer segment, which may include a wrap-around.
825 			 * Update windex to account for both all in one go
826 			 * so the reader can read() the data atomically.
827 			 */
828 			error = uiomove(&wpb->buffer[windex], segsize, uio);
829 			if (error == 0 && segsize < space) {
830 				segsize = space - segsize;
831 				error = uiomove(&wpb->buffer[0], segsize, uio);
832 			}
833 			if (error)
834 				break;
835 
836 			/*
837 			 * Memory fence prior to windex updating (note: not
838 			 * needed so this is a NOP on Intel).
839 			 */
840 			cpu_sfence();
841 			wpb->windex += space;
842 
843 			/*
844 			 * Signal reader
845 			 */
846 			if (wcount != 0)
847 				pipesignal(wpb, PIPE_WANTR);
848 			wcount += space;
849 			continue;
850 		}
851 
852 		/*
853 		 * Wakeup any pending reader
854 		 */
855 		pipesignal(wpb, PIPE_WANTR);
856 
857 		/*
858 		 * don't block on non-blocking I/O
859 		 */
860 		if (nbio) {
861 			error = EAGAIN;
862 			break;
863 		}
864 
865 #ifdef _RDTSC_SUPPORTED_
866 		if (pipe_delay) {
867 			int64_t tsc_target;
868 			int good = 0;
869 
870 			tsc_target = tsc_get_target(pipe_delay);
871 			while (tsc_test_target(tsc_target) == 0) {
872 				cpu_lfence();
873 				space = wpb->size - (wpb->windex - wpb->rindex);
874 				if ((space < uio->uio_resid) &&
875 				    (orig_resid <= PIPE_BUF)) {
876 					space = 0;
877 				}
878 				if (space) {
879 					good = 1;
880 					break;
881 				}
882 				cpu_pause();
883 			}
884 			if (good)
885 				continue;
886 		}
887 #endif
888 
889 		/*
890 		 * Interlocked test.   Atomic op enforces the memory barrier.
891 		 */
892 		tsleep_interlock(wpb, PCATCH);
893 		atomic_set_int(&wpb->state, PIPE_WANTW);
894 
895 		/*
896 		 * Retest space available after memory barrier above.
897 		 * Writes of size <= PIPE_BUF must be atomic.
898 		 */
899 		space = wpb->size - (wpb->windex - wpb->rindex);
900 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
901 			space = 0;
902 
903 		/*
904 		 * Retest EOF after memory barrier above.
905 		 */
906 		if (wpb->state & PIPE_WEOF) {
907 			error = EPIPE;
908 			break;
909 		}
910 
911 		/*
912 		 * We have no more space and have something to offer,
913 		 * wake up select/poll/kq.
914 		 */
915 		if (space == 0) {
916 			pipewakeup(wpb, 1);
917 			error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
918 		}
919 
920 		/*
921 		 * Break out if we errored or the read side wants us to go
922 		 * away.
923 		 */
924 		if (error)
925 			break;
926 		if (wpb->state & PIPE_WEOF) {
927 			error = EPIPE;
928 			break;
929 		}
930 	}
931 	pipe_end_uio(&wpb->wip);
932 
933 	/*
934 	 * If we have put any characters in the buffer, we wake up
935 	 * the reader.
936 	 *
937 	 * Both rlock and wlock are required to be able to modify pipe_state.
938 	 */
939 	if (wpb->windex != wpb->rindex) {
940 		pipesignal(wpb, PIPE_WANTR);
941 		pipewakeup(wpb, 1);
942 	}
943 
944 	/*
945 	 * Don't return EPIPE if I/O was successful
946 	 */
947 	if ((wpb->rindex == wpb->windex) &&
948 	    (uio->uio_resid == 0) &&
949 	    (error == EPIPE)) {
950 		error = 0;
951 	}
952 
953 	if (error == 0)
954 		vfs_timestamp(&wpb->mtime);
955 
956 	/*
957 	 * We have something to offer,
958 	 * wake up select/poll/kq.
959 	 */
960 	/*space = wpb->windex - wpb->rindex;*/
961 	lwkt_reltoken(&wpb->wlock);
962 
963 	return (error);
964 }
965 
966 /*
967  * we implement a very minimal set of ioctls for compatibility with sockets.
968  */
969 static int
970 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
971 	   struct ucred *cred, struct sysmsg *msg)
972 {
973 	struct pipebuf *rpb;
974 	struct pipe *pipe;
975 	int error;
976 
977 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
978 	if ((intptr_t)fp->f_data & 1) {
979 		rpb = &pipe->bufferB;
980 	} else {
981 		rpb = &pipe->bufferA;
982 	}
983 
984 	lwkt_gettoken(&rpb->rlock);
985 	lwkt_gettoken(&rpb->wlock);
986 
987 	switch (cmd) {
988 	case FIOASYNC:
989 		if (*(int *)data) {
990 			atomic_set_int(&rpb->state, PIPE_ASYNC);
991 		} else {
992 			atomic_clear_int(&rpb->state, PIPE_ASYNC);
993 		}
994 		error = 0;
995 		break;
996 	case FIONREAD:
997 		*(int *)data = (int)(rpb->windex - rpb->rindex);
998 		error = 0;
999 		break;
1000 	case FIOSETOWN:
1001 		error = fsetown(*(int *)data, &rpb->sigio);
1002 		break;
1003 	case FIOGETOWN:
1004 		*(int *)data = fgetown(&rpb->sigio);
1005 		error = 0;
1006 		break;
1007 	case TIOCSPGRP:
1008 		/* This is deprecated, FIOSETOWN should be used instead. */
1009 		error = fsetown(-(*(int *)data), &rpb->sigio);
1010 		break;
1011 
1012 	case TIOCGPGRP:
1013 		/* This is deprecated, FIOGETOWN should be used instead. */
1014 		*(int *)data = -fgetown(&rpb->sigio);
1015 		error = 0;
1016 		break;
1017 	default:
1018 		error = ENOTTY;
1019 		break;
1020 	}
1021 	lwkt_reltoken(&rpb->wlock);
1022 	lwkt_reltoken(&rpb->rlock);
1023 
1024 	return (error);
1025 }
1026 
1027 /*
1028  * MPSAFE
1029  */
1030 static int
1031 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1032 {
1033 	struct pipebuf *rpb;
1034 	struct pipe *pipe;
1035 
1036 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1037 	if ((intptr_t)fp->f_data & 1) {
1038 		rpb = &pipe->bufferB;
1039 	} else {
1040 		rpb = &pipe->bufferA;
1041 	}
1042 
1043 	bzero((caddr_t)ub, sizeof(*ub));
1044 	ub->st_mode = S_IFIFO;
1045 	ub->st_blksize = rpb->size;
1046 	ub->st_size = rpb->windex - rpb->rindex;
1047 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1048 	ub->st_atimespec = rpb->atime;
1049 	ub->st_mtimespec = rpb->mtime;
1050 	ub->st_ctimespec = pipe->ctime;
1051 	/*
1052 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1053 	 * st_flags, st_gen.
1054 	 * XXX (st_dev, st_ino) should be unique.
1055 	 */
1056 
1057 	return (0);
1058 }
1059 
1060 static int
1061 pipe_close(struct file *fp)
1062 {
1063 	struct pipebuf *rpb;
1064 	struct pipebuf *wpb;
1065 	struct pipe *pipe;
1066 
1067 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1068 	if ((intptr_t)fp->f_data & 1) {
1069 		rpb = &pipe->bufferB;
1070 		wpb = &pipe->bufferA;
1071 	} else {
1072 		rpb = &pipe->bufferA;
1073 		wpb = &pipe->bufferB;
1074 	}
1075 
1076 	fp->f_ops = &badfileops;
1077 	fp->f_data = NULL;
1078 	funsetown(&rpb->sigio);
1079 	pipeclose(pipe, rpb, wpb);
1080 
1081 	return (0);
1082 }
1083 
1084 /*
1085  * Shutdown one or both directions of a full-duplex pipe.
1086  */
1087 static int
1088 pipe_shutdown(struct file *fp, int how)
1089 {
1090 	struct pipebuf *rpb;
1091 	struct pipebuf *wpb;
1092 	struct pipe *pipe;
1093 	int error = EPIPE;
1094 
1095 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1096 	if ((intptr_t)fp->f_data & 1) {
1097 		rpb = &pipe->bufferB;
1098 		wpb = &pipe->bufferA;
1099 	} else {
1100 		rpb = &pipe->bufferA;
1101 		wpb = &pipe->bufferB;
1102 	}
1103 
1104 	/*
1105 	 * We modify pipe_state on both pipes, which means we need
1106 	 * all four tokens!
1107 	 */
1108 	lwkt_gettoken(&rpb->rlock);
1109 	lwkt_gettoken(&rpb->wlock);
1110 	lwkt_gettoken(&wpb->rlock);
1111 	lwkt_gettoken(&wpb->wlock);
1112 
1113 	switch(how) {
1114 	case SHUT_RDWR:
1115 	case SHUT_RD:
1116 		/*
1117 		 * EOF on my reads and peer writes
1118 		 */
1119 		atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1120 		if (rpb->state & PIPE_WANTR) {
1121 			rpb->state &= ~PIPE_WANTR;
1122 			wakeup(rpb);
1123 		}
1124 		if (rpb->state & PIPE_WANTW) {
1125 			rpb->state &= ~PIPE_WANTW;
1126 			wakeup(rpb);
1127 		}
1128 		error = 0;
1129 		if (how == SHUT_RD)
1130 			break;
1131 		/* fall through */
1132 	case SHUT_WR:
1133 		/*
1134 		 * EOF on peer reads and my writes
1135 		 */
1136 		atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1137 		if (wpb->state & PIPE_WANTR) {
1138 			wpb->state &= ~PIPE_WANTR;
1139 			wakeup(wpb);
1140 		}
1141 		if (wpb->state & PIPE_WANTW) {
1142 			wpb->state &= ~PIPE_WANTW;
1143 			wakeup(wpb);
1144 		}
1145 		error = 0;
1146 		break;
1147 	}
1148 	pipewakeup(rpb, 1);
1149 	pipewakeup(wpb, 1);
1150 
1151 	lwkt_reltoken(&wpb->wlock);
1152 	lwkt_reltoken(&wpb->rlock);
1153 	lwkt_reltoken(&rpb->wlock);
1154 	lwkt_reltoken(&rpb->rlock);
1155 
1156 	return (error);
1157 }
1158 
1159 /*
1160  * Destroy the pipe buffer.
1161  */
1162 static void
1163 pipe_free_kmem(struct pipebuf *pb)
1164 {
1165 	if (pb->buffer != NULL) {
1166 		kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1167 		pb->buffer = NULL;
1168 		pb->object = NULL;
1169 	}
1170 }
1171 
1172 /*
1173  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1174  * and writing on wpb.  This routine must be called twice with the pipebufs
1175  * reversed to close both directions.
1176  */
1177 static void
1178 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1179 {
1180 	globaldata_t gd;
1181 
1182 	if (pipe == NULL)
1183 		return;
1184 
1185 	/*
1186 	 * We need both the read and write tokens to modify pipe_state.
1187 	 */
1188 	lwkt_gettoken(&rpb->rlock);
1189 	lwkt_gettoken(&rpb->wlock);
1190 
1191 	/*
1192 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1193 	 * wakeup anyone blocked on our pipe.  No action if our side
1194 	 * is already closed.
1195 	 */
1196 	if (rpb->state & PIPE_CLOSED) {
1197 		lwkt_reltoken(&rpb->wlock);
1198 		lwkt_reltoken(&rpb->rlock);
1199 		return;
1200 	}
1201 
1202 	atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1203 	pipewakeup(rpb, 1);
1204 	if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1205 		rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1206 		wakeup(rpb);
1207 	}
1208 	lwkt_reltoken(&rpb->wlock);
1209 	lwkt_reltoken(&rpb->rlock);
1210 
1211 	/*
1212 	 * Disconnect from peer.
1213 	 */
1214 	lwkt_gettoken(&wpb->rlock);
1215 	lwkt_gettoken(&wpb->wlock);
1216 
1217 	atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1218 	pipewakeup(wpb, 1);
1219 	if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1220 		wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1221 		wakeup(wpb);
1222 	}
1223 	if (SLIST_FIRST(&wpb->kq.ki_note))
1224 		KNOTE(&wpb->kq.ki_note, 0);
1225 	lwkt_reltoken(&wpb->wlock);
1226 	lwkt_reltoken(&wpb->rlock);
1227 
1228 	/*
1229 	 * Free resources once both sides are closed.  We maintain a pcpu
1230 	 * cache to improve performance, so the actual tear-down case is
1231 	 * limited to bulk situations.
1232 	 *
1233 	 * However, the bulk tear-down case can cause intense contention
1234 	 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1235 	 * of processes are killed at the same time.  To deal with this we
1236 	 * use a pcpu mutex to maintain concurrency but also limit the
1237 	 * number of threads banging on the map and pmap.
1238 	 *
1239 	 * We use the mtx mechanism instead of the lockmgr mechanism because
1240 	 * the mtx mechanism utilizes a queued design which will not break
1241 	 * down in the face of thousands to hundreds of thousands of
1242 	 * processes trying to free pipes simultaneously.  The lockmgr
1243 	 * mechanism will wind up waking them all up each time a lock
1244 	 * cycles.
1245 	 */
1246 	if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1247 		gd = mycpu;
1248 		if (gd->gd_pipeqcount >= pipe_maxcache) {
1249 			mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1250 			pipe_free_kmem(rpb);
1251 			pipe_free_kmem(wpb);
1252 			mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1253 			kfree(pipe, M_PIPE);
1254 		} else {
1255 			rpb->state = 0;
1256 			wpb->state = 0;
1257 			pipe->next = gd->gd_pipeq;
1258 			gd->gd_pipeq = pipe;
1259 			++gd->gd_pipeqcount;
1260 		}
1261 	}
1262 }
1263 
1264 static int
1265 pipe_kqfilter(struct file *fp, struct knote *kn)
1266 {
1267 	struct pipebuf *rpb;
1268 	struct pipebuf *wpb;
1269 	struct pipe *pipe;
1270 
1271 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1272 	if ((intptr_t)fp->f_data & 1) {
1273 		rpb = &pipe->bufferB;
1274 		wpb = &pipe->bufferA;
1275 	} else {
1276 		rpb = &pipe->bufferA;
1277 		wpb = &pipe->bufferB;
1278 	}
1279 
1280 	switch (kn->kn_filter) {
1281 	case EVFILT_READ:
1282 		kn->kn_fop = &pipe_rfiltops;
1283 		break;
1284 	case EVFILT_WRITE:
1285 		kn->kn_fop = &pipe_wfiltops;
1286 		if (wpb->state & PIPE_CLOSED) {
1287 			/* other end of pipe has been closed */
1288 			return (EPIPE);
1289 		}
1290 		break;
1291 	default:
1292 		return (EOPNOTSUPP);
1293 	}
1294 
1295 	if (rpb == &pipe->bufferA)
1296 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1297 	else
1298 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1299 
1300 	knote_insert(&rpb->kq.ki_note, kn);
1301 
1302 	return (0);
1303 }
1304 
1305 static void
1306 filt_pipedetach(struct knote *kn)
1307 {
1308 	struct pipebuf *rpb;
1309 	struct pipebuf *wpb;
1310 	struct pipe *pipe;
1311 
1312 	pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1313 	if ((intptr_t)kn->kn_hook & 1) {
1314 		rpb = &pipe->bufferB;
1315 		wpb = &pipe->bufferA;
1316 	} else {
1317 		rpb = &pipe->bufferA;
1318 		wpb = &pipe->bufferB;
1319 	}
1320 	knote_remove(&rpb->kq.ki_note, kn);
1321 }
1322 
1323 /*ARGSUSED*/
1324 static int
1325 filt_piperead(struct knote *kn, long hint)
1326 {
1327 	struct pipebuf *rpb;
1328 	struct pipebuf *wpb;
1329 	struct pipe *pipe;
1330 	int ready = 0;
1331 
1332 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1333 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1334 		rpb = &pipe->bufferB;
1335 		wpb = &pipe->bufferA;
1336 	} else {
1337 		rpb = &pipe->bufferA;
1338 		wpb = &pipe->bufferB;
1339 	}
1340 
1341 	lwkt_gettoken(&rpb->rlock);
1342 	lwkt_gettoken(&rpb->wlock);
1343 
1344 	kn->kn_data = rpb->windex - rpb->rindex;
1345 
1346 	if (rpb->state & PIPE_REOF) {
1347 		/*
1348 		 * Only set NODATA if all data has been exhausted
1349 		 */
1350 		if (kn->kn_data == 0)
1351 			kn->kn_flags |= EV_NODATA;
1352 		kn->kn_flags |= EV_EOF;
1353 		ready = 1;
1354 	}
1355 
1356 	lwkt_reltoken(&rpb->wlock);
1357 	lwkt_reltoken(&rpb->rlock);
1358 
1359 	if (!ready)
1360 		ready = kn->kn_data > 0;
1361 
1362 	return (ready);
1363 }
1364 
1365 /*ARGSUSED*/
1366 static int
1367 filt_pipewrite(struct knote *kn, long hint)
1368 {
1369 	struct pipebuf *rpb;
1370 	struct pipebuf *wpb;
1371 	struct pipe *pipe;
1372 	int ready = 0;
1373 
1374 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1375 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1376 		rpb = &pipe->bufferB;
1377 		wpb = &pipe->bufferA;
1378 	} else {
1379 		rpb = &pipe->bufferA;
1380 		wpb = &pipe->bufferB;
1381 	}
1382 
1383 	kn->kn_data = 0;
1384 	if (wpb->state & PIPE_CLOSED) {
1385 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1386 		return (1);
1387 	}
1388 
1389 	lwkt_gettoken(&wpb->rlock);
1390 	lwkt_gettoken(&wpb->wlock);
1391 
1392 	if (wpb->state & PIPE_WEOF) {
1393 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1394 		ready = 1;
1395 	}
1396 
1397 	if (!ready)
1398 		kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1399 
1400 	lwkt_reltoken(&wpb->wlock);
1401 	lwkt_reltoken(&wpb->rlock);
1402 
1403 	if (!ready)
1404 		ready = kn->kn_data >= PIPE_BUF;
1405 
1406 	return (ready);
1407 }
1408