xref: /dragonfly/sys/kern/sys_pipe.c (revision 73b5ca6b)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68 
69 #include <machine/cpufunc.h>
70 
71 struct pipegdlock {
72 	struct mtx	mtx;
73 } __cachealign;
74 
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 		struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 		struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 		struct ucred *cred, struct sysmsg *msg);
88 
89 static struct fileops pipeops = {
90 	.fo_read = pipe_read,
91 	.fo_write = pipe_write,
92 	.fo_ioctl = pipe_ioctl,
93 	.fo_kqfilter = pipe_kqfilter,
94 	.fo_stat = pipe_stat,
95 	.fo_close = pipe_close,
96 	.fo_shutdown = pipe_shutdown
97 };
98 
99 static void	filt_pipedetach(struct knote *kn);
100 static int	filt_piperead(struct knote *kn, long hint);
101 static int	filt_pipewrite(struct knote *kn, long hint);
102 
103 static struct filterops pipe_rfiltops =
104 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107 
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109 
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111 
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
114 
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118 
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127 
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 static int pipe_delay = 4000;	/* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143 
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151 	size_t mbytes = kmem_lim_size();
152 	int n;
153 
154 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 		if (mbytes >= 7 * 1024)
156 			pipe_maxcache *= 2;
157 		if (mbytes >= 15 * 1024)
158 			pipe_maxcache *= 2;
159 	}
160 	pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161 			     M_PIPE, M_WAITOK | M_ZERO);
162 	for (n = 0; n < ncpus; ++n)
163 		mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
164 }
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
166 
167 static void pipeclose (struct pipe *pipe,
168 		struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
171 
172 /*
173  * Test and clear the specified flag, wakeup(pb) if it was set.
174  * This function must also act as a memory barrier.
175  */
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
178 {
179 	uint32_t oflags;
180 	uint32_t nflags;
181 
182 	for (;;) {
183 		oflags = pb->state;
184 		cpu_ccfence();
185 		nflags = oflags & ~flags;
186 		if (atomic_cmpset_int(&pb->state, oflags, nflags))
187 			break;
188 	}
189 	if (oflags & flags)
190 		wakeup(pb);
191 }
192 
193 /*
194  *
195  */
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
198 {
199 	if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200 		lwkt_gettoken(&sigio_token);
201 		pgsigio(pb->sigio, SIGIO, 0);
202 		lwkt_reltoken(&sigio_token);
203 	}
204 	KNOTE(&pb->kq.ki_note, 0);
205 }
206 
207 /*
208  * These routines are called before and after a UIO.  The UIO
209  * may block, causing our held tokens to be lost temporarily.
210  *
211  * We use these routines to serialize reads against other reads
212  * and writes against other writes.
213  *
214  * The appropriate token is held on entry so *ipp does not race.
215  */
216 static __inline int
217 pipe_start_uio(int *ipp)
218 {
219 	int error;
220 
221 	while (*ipp) {
222 		*ipp = -1;
223 		error = tsleep(ipp, PCATCH, "pipexx", 0);
224 		if (error)
225 			return (error);
226 	}
227 	*ipp = 1;
228 	return (0);
229 }
230 
231 static __inline void
232 pipe_end_uio(int *ipp)
233 {
234 	if (*ipp < 0) {
235 		*ipp = 0;
236 		wakeup(ipp);
237 	} else {
238 		KKASSERT(*ipp > 0);
239 		*ipp = 0;
240 	}
241 }
242 
243 /*
244  * The pipe system call for the DTYPE_PIPE type of pipes
245  *
246  * pipe_args(int dummy)
247  *
248  * MPSAFE
249  */
250 int
251 sys_pipe(struct pipe_args *uap)
252 {
253 	return kern_pipe(uap->sysmsg_fds, 0);
254 }
255 
256 int
257 sys_pipe2(struct pipe2_args *uap)
258 {
259 	return kern_pipe(uap->sysmsg_fds, uap->flags);
260 }
261 
262 int
263 kern_pipe(long *fds, int flags)
264 {
265 	struct thread *td = curthread;
266 	struct filedesc *fdp = td->td_proc->p_fd;
267 	struct file *rf, *wf;
268 	struct pipe *pipe;
269 	int fd1, fd2, error;
270 
271 	pipe = NULL;
272 	if (pipe_create(&pipe)) {
273 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275 		return (ENFILE);
276 	}
277 
278 	error = falloc(td->td_lwp, &rf, &fd1);
279 	if (error) {
280 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282 		return (error);
283 	}
284 	fds[0] = fd1;
285 
286 	/*
287 	 * Warning: once we've gotten past allocation of the fd for the
288 	 * read-side, we can only drop the read side via fdrop() in order
289 	 * to avoid races against processes which manage to dup() the read
290 	 * side while we are blocked trying to allocate the write side.
291 	 */
292 	rf->f_type = DTYPE_PIPE;
293 	rf->f_flag = FREAD | FWRITE;
294 	rf->f_ops = &pipeops;
295 	rf->f_data = (void *)((intptr_t)pipe | 0);
296 	if (flags & O_NONBLOCK)
297 		rf->f_flag |= O_NONBLOCK;
298 	if (flags & O_CLOEXEC)
299 		fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
300 
301 	error = falloc(td->td_lwp, &wf, &fd2);
302 	if (error) {
303 		fsetfd(fdp, NULL, fd1);
304 		fdrop(rf);
305 		/* pipeA has been closed by fdrop() */
306 		/* close pipeB here */
307 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308 		return (error);
309 	}
310 	wf->f_type = DTYPE_PIPE;
311 	wf->f_flag = FREAD | FWRITE;
312 	wf->f_ops = &pipeops;
313 	wf->f_data = (void *)((intptr_t)pipe | 1);
314 	if (flags & O_NONBLOCK)
315 		wf->f_flag |= O_NONBLOCK;
316 	if (flags & O_CLOEXEC)
317 		fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
318 
319 	fds[1] = fd2;
320 
321 	/*
322 	 * Once activated the peer relationship remains valid until
323 	 * both sides are closed.
324 	 */
325 	fsetfd(fdp, rf, fd1);
326 	fsetfd(fdp, wf, fd2);
327 	fdrop(rf);
328 	fdrop(wf);
329 
330 	return (0);
331 }
332 
333 /*
334  * [re]allocates KVA for the pipe's circular buffer.  The space is
335  * pageable.  Called twice to setup full-duplex communications.
336  *
337  * NOTE: Independent vm_object's are used to improve performance.
338  *
339  * Returns 0 on success, ENOMEM on failure.
340  */
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
343 {
344 	struct vm_object *object;
345 	caddr_t buffer;
346 	vm_pindex_t npages;
347 	int error;
348 
349 	size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350 	if (size < 16384)
351 		size = 16384;
352 	if (size > 1024*1024)
353 		size = 1024*1024;
354 
355 	npages = round_page(size) / PAGE_SIZE;
356 	object = pb->object;
357 
358 	/*
359 	 * [re]create the object if necessary and reserve space for it
360 	 * in the kernel_map.  The object and memory are pageable.  On
361 	 * success, free the old resources before assigning the new
362 	 * ones.
363 	 */
364 	if (object == NULL || object->size != npages) {
365 		object = vm_object_allocate(OBJT_DEFAULT, npages);
366 		buffer = (caddr_t)vm_map_min(&kernel_map);
367 
368 		error = vm_map_find(&kernel_map, object, NULL,
369 				    0, (vm_offset_t *)&buffer, size,
370 				    PAGE_SIZE, TRUE,
371 				    VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372 				    VM_PROT_ALL, VM_PROT_ALL, 0);
373 
374 		if (error != KERN_SUCCESS) {
375 			vm_object_deallocate(object);
376 			return (ENOMEM);
377 		}
378 		pipe_free_kmem(pb);
379 		pb->object = object;
380 		pb->buffer = buffer;
381 		pb->size = size;
382 	}
383 	pb->rindex = 0;
384 	pb->windex = 0;
385 
386 	return (0);
387 }
388 
389 /*
390  * Initialize and allocate VM and memory for pipe, pulling the pipe from
391  * our per-cpu cache if possible.
392  *
393  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
394  * must still deallocate the pipe on failure.
395  */
396 static int
397 pipe_create(struct pipe **pipep)
398 {
399 	globaldata_t gd = mycpu;
400 	struct pipe *pipe;
401 	int error;
402 
403 	if ((pipe = gd->gd_pipeq) != NULL) {
404 		gd->gd_pipeq = pipe->next;
405 		--gd->gd_pipeqcount;
406 		pipe->next = NULL;
407 	} else {
408 		pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409 		pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
410 		lwkt_token_init(&pipe->bufferA.rlock, "piper");
411 		lwkt_token_init(&pipe->bufferA.wlock, "pipew");
412 		lwkt_token_init(&pipe->bufferB.rlock, "piper");
413 		lwkt_token_init(&pipe->bufferB.wlock, "pipew");
414 	}
415 	*pipep = pipe;
416 	if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
417 		return (error);
418 	}
419 	if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
420 		return (error);
421 	}
422 	vfs_timestamp(&pipe->ctime);
423 	pipe->bufferA.atime = pipe->ctime;
424 	pipe->bufferA.mtime = pipe->ctime;
425 	pipe->bufferB.atime = pipe->ctime;
426 	pipe->bufferB.mtime = pipe->ctime;
427 	pipe->open_count = 2;
428 
429 	return (0);
430 }
431 
432 /*
433  * Read data from a pipe
434  */
435 static int
436 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
437 {
438 	struct pipebuf *rpb;
439 	struct pipebuf *wpb;
440 	struct pipe *pipe;
441 	size_t nread = 0;
442 	size_t size;	/* total bytes available */
443 	size_t nsize;	/* total bytes to read */
444 	size_t rindex;	/* contiguous bytes available */
445 	int notify_writer;
446 	int bigread;
447 	int bigcount;
448 	int error;
449 	int nbio;
450 
451 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
452 	if ((intptr_t)fp->f_data & 1) {
453 		rpb = &pipe->bufferB;
454 		wpb = &pipe->bufferA;
455 	} else {
456 		rpb = &pipe->bufferA;
457 		wpb = &pipe->bufferB;
458 	}
459 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
460 
461 	if (uio->uio_resid == 0)
462 		return(0);
463 
464 	/*
465 	 * Calculate nbio
466 	 */
467 	if (fflags & O_FBLOCKING)
468 		nbio = 0;
469 	else if (fflags & O_FNONBLOCKING)
470 		nbio = 1;
471 	else if (fp->f_flag & O_NONBLOCK)
472 		nbio = 1;
473 	else
474 		nbio = 0;
475 
476 	/*
477 	 * 'quick' NBIO test before things get expensive.
478 	 */
479 	if (nbio && rpb->rindex == rpb->windex &&
480 	    (rpb->state & PIPE_REOF) == 0) {
481 		return EAGAIN;
482 	}
483 
484 	/*
485 	 * Reads are serialized.  Note however that buffer.buffer and
486 	 * buffer.size can change out from under us when the number
487 	 * of bytes in the buffer are zero due to the write-side doing a
488 	 * pipespace().
489 	 */
490 	lwkt_gettoken(&rpb->rlock);
491 	error = pipe_start_uio(&rpb->rip);
492 	if (error) {
493 		lwkt_reltoken(&rpb->rlock);
494 		return (error);
495 	}
496 	notify_writer = 0;
497 
498 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
499 	bigcount = 10;
500 
501 	while (uio->uio_resid) {
502 		/*
503 		 * Don't hog the cpu.
504 		 */
505 		if (bigread && --bigcount == 0) {
506 			lwkt_user_yield();
507 			bigcount = 10;
508 			if (CURSIG(curthread->td_lwp)) {
509 				error = EINTR;
510 				break;
511 			}
512 		}
513 
514 		/*
515 		 * lfence required to avoid read-reordering of buffer
516 		 * contents prior to validation of size.
517 		 */
518 		size = rpb->windex - rpb->rindex;
519 		cpu_lfence();
520 		if (size) {
521 			rindex = rpb->rindex & (rpb->size - 1);
522 			nsize = size;
523 			if (nsize > rpb->size - rindex)
524 				nsize = rpb->size - rindex;
525 			nsize = szmin(nsize, uio->uio_resid);
526 
527 			/*
528 			 * Limit how much we move in one go so we have a
529 			 * chance to kick the writer while data is still
530 			 * available in the pipe.  This avoids getting into
531 			 * a ping-pong with the writer.
532 			 */
533 			if (nsize > (rpb->size >> 1))
534 				nsize = rpb->size >> 1;
535 
536 			error = uiomove(&rpb->buffer[rindex], nsize, uio);
537 			if (error)
538 				break;
539 			rpb->rindex += nsize;
540 			nread += nsize;
541 
542 			/*
543 			 * If the FIFO is still over half full just continue
544 			 * and do not try to notify the writer yet.  If
545 			 * less than half full notify any waiting writer.
546 			 */
547 			if (size - nsize > (rpb->size >> 1)) {
548 				notify_writer = 0;
549 			} else {
550 				notify_writer = 1;
551 				pipesignal(rpb, PIPE_WANTW);
552 			}
553 			continue;
554 		}
555 
556 		/*
557 		 * If the "write-side" was blocked we wake it up.  This code
558 		 * is reached when the buffer is completely emptied.
559 		 */
560 		pipesignal(rpb, PIPE_WANTW);
561 
562 		/*
563 		 * Pick up our copy loop again if the writer sent data to
564 		 * us while we were messing around.
565 		 *
566 		 * On a SMP box poll up to pipe_delay nanoseconds for new
567 		 * data.  Typically a value of 2000 to 4000 is sufficient
568 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
569 		 * is used for synchronous communications with small packets,
570 		 * and 8000 or so (8uS) will pipeline large buffer xfers
571 		 * between cpus over a pipe.
572 		 *
573 		 * For synchronous communications a hit means doing a
574 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
575 		 * where as miss requiring a tsleep/wakeup sequence
576 		 * will take 7uS or more.
577 		 */
578 		if (rpb->windex != rpb->rindex)
579 			continue;
580 
581 #ifdef _RDTSC_SUPPORTED_
582 		if (pipe_delay) {
583 			int64_t tsc_target;
584 			int good = 0;
585 
586 			tsc_target = tsc_get_target(pipe_delay);
587 			while (tsc_test_target(tsc_target) == 0) {
588 				cpu_lfence();
589 				if (rpb->windex != rpb->rindex) {
590 					good = 1;
591 					break;
592 				}
593 				cpu_pause();
594 			}
595 			if (good)
596 				continue;
597 		}
598 #endif
599 
600 		/*
601 		 * Detect EOF condition, do not set error.
602 		 */
603 		if (rpb->state & PIPE_REOF)
604 			break;
605 
606 		/*
607 		 * Break if some data was read, or if this was a non-blocking
608 		 * read.
609 		 */
610 		if (nread > 0)
611 			break;
612 
613 		if (nbio) {
614 			error = EAGAIN;
615 			break;
616 		}
617 
618 		/*
619 		 * Last chance, interlock with WANTR
620 		 */
621 		tsleep_interlock(rpb, PCATCH);
622 		atomic_set_int(&rpb->state, PIPE_WANTR);
623 
624 		/*
625 		 * Retest bytes available after memory barrier above.
626 		 */
627 		size = rpb->windex - rpb->rindex;
628 		if (size)
629 			continue;
630 
631 		/*
632 		 * Retest EOF after memory barrier above.
633 		 */
634 		if (rpb->state & PIPE_REOF)
635 			break;
636 
637 		/*
638 		 * Wait for more data or state change
639 		 */
640 		error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
641 		if (error)
642 			break;
643 	}
644 	pipe_end_uio(&rpb->rip);
645 
646 	/*
647 	 * Uptime last access time
648 	 */
649 	if (error == 0 && nread)
650 		vfs_timestamp(&rpb->atime);
651 
652 	/*
653 	 * If we drained the FIFO more then half way then handle
654 	 * write blocking hysteresis.
655 	 *
656 	 * Note that PIPE_WANTW cannot be set by the writer without
657 	 * it holding both rlock and wlock, so we can test it
658 	 * while holding just rlock.
659 	 */
660 	if (notify_writer) {
661 		/*
662 		 * Synchronous blocking is done on the pipe involved
663 		 */
664 		pipesignal(rpb, PIPE_WANTW);
665 
666 		/*
667 		 * But we may also have to deal with a kqueue which is
668 		 * stored on the same pipe as its descriptor, so a
669 		 * EVFILT_WRITE event waiting for our side to drain will
670 		 * be on the other side.
671 		 */
672 		pipewakeup(wpb, 0);
673 	}
674 	/*size = rpb->windex - rpb->rindex;*/
675 	lwkt_reltoken(&rpb->rlock);
676 
677 	return (error);
678 }
679 
680 static int
681 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
682 {
683 	struct pipebuf *rpb;
684 	struct pipebuf *wpb;
685 	struct pipe *pipe;
686 	size_t windex;
687 	size_t space;
688 	size_t wcount;
689 	size_t orig_resid;
690 	int bigwrite;
691 	int bigcount;
692 	int error;
693 	int nbio;
694 
695 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
696 	if ((intptr_t)fp->f_data & 1) {
697 		rpb = &pipe->bufferB;
698 		wpb = &pipe->bufferA;
699 	} else {
700 		rpb = &pipe->bufferA;
701 		wpb = &pipe->bufferB;
702 	}
703 
704 	/*
705 	 * Calculate nbio
706 	 */
707 	if (fflags & O_FBLOCKING)
708 		nbio = 0;
709 	else if (fflags & O_FNONBLOCKING)
710 		nbio = 1;
711 	else if (fp->f_flag & O_NONBLOCK)
712 		nbio = 1;
713 	else
714 		nbio = 0;
715 
716 	/*
717 	 * 'quick' NBIO test before things get expensive.
718 	 */
719 	if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
720 	    uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
721 		return EAGAIN;
722 	}
723 
724 	/*
725 	 * Writes go to the peer.  The peer will always exist.
726 	 */
727 	lwkt_gettoken(&wpb->wlock);
728 	if (wpb->state & PIPE_WEOF) {
729 		lwkt_reltoken(&wpb->wlock);
730 		return (EPIPE);
731 	}
732 
733 	/*
734 	 * Degenerate case (EPIPE takes prec)
735 	 */
736 	if (uio->uio_resid == 0) {
737 		lwkt_reltoken(&wpb->wlock);
738 		return(0);
739 	}
740 
741 	/*
742 	 * Writes are serialized (start_uio must be called with wlock)
743 	 */
744 	error = pipe_start_uio(&wpb->wip);
745 	if (error) {
746 		lwkt_reltoken(&wpb->wlock);
747 		return (error);
748 	}
749 
750 	orig_resid = uio->uio_resid;
751 	wcount = 0;
752 
753 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
754 	bigcount = 10;
755 
756 	while (uio->uio_resid) {
757 		if (wpb->state & PIPE_WEOF) {
758 			error = EPIPE;
759 			break;
760 		}
761 
762 		/*
763 		 * Don't hog the cpu.
764 		 */
765 		if (bigwrite && --bigcount == 0) {
766 			lwkt_user_yield();
767 			bigcount = 10;
768 			if (CURSIG(curthread->td_lwp)) {
769 				error = EINTR;
770 				break;
771 			}
772 		}
773 
774 		windex = wpb->windex & (wpb->size - 1);
775 		space = wpb->size - (wpb->windex - wpb->rindex);
776 
777 		/*
778 		 * Writes of size <= PIPE_BUF must be atomic.
779 		 */
780 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
781 			space = 0;
782 
783 		/*
784 		 * Write to fill, read size handles write hysteresis.  Also
785 		 * additional restrictions can cause select-based non-blocking
786 		 * writes to spin.
787 		 */
788 		if (space > 0) {
789 			size_t segsize;
790 
791 			/*
792 			 * We want to notify a potentially waiting reader
793 			 * before we exhaust the write buffer for SMP
794 			 * pipelining.  Otherwise the write/read will begin
795 			 * to ping-pong.
796 			 */
797 			space = szmin(space, uio->uio_resid);
798 			if (space > (wpb->size >> 1))
799 				space = (wpb->size >> 1);
800 
801 			/*
802 			 * First segment to transfer is minimum of
803 			 * transfer size and contiguous space in
804 			 * pipe buffer.  If first segment to transfer
805 			 * is less than the transfer size, we've got
806 			 * a wraparound in the buffer.
807 			 */
808 			segsize = wpb->size - windex;
809 			if (segsize > space)
810 				segsize = space;
811 
812 			/*
813 			 * If this is the first loop and the reader is
814 			 * blocked, do a preemptive wakeup of the reader.
815 			 *
816 			 * On SMP the IPI latency plus the wlock interlock
817 			 * on the reader side is the fastest way to get the
818 			 * reader going.  (The scheduler will hard loop on
819 			 * lock tokens).
820 			 */
821 			if (wcount == 0)
822 				pipesignal(wpb, PIPE_WANTR);
823 
824 			/*
825 			 * Transfer segment, which may include a wrap-around.
826 			 * Update windex to account for both all in one go
827 			 * so the reader can read() the data atomically.
828 			 */
829 			error = uiomove(&wpb->buffer[windex], segsize, uio);
830 			if (error == 0 && segsize < space) {
831 				segsize = space - segsize;
832 				error = uiomove(&wpb->buffer[0], segsize, uio);
833 			}
834 			if (error)
835 				break;
836 
837 			/*
838 			 * Memory fence prior to windex updating (note: not
839 			 * needed so this is a NOP on Intel).
840 			 */
841 			cpu_sfence();
842 			wpb->windex += space;
843 
844 			/*
845 			 * Signal reader
846 			 */
847 			if (wcount != 0)
848 				pipesignal(wpb, PIPE_WANTR);
849 			wcount += space;
850 			continue;
851 		}
852 
853 		/*
854 		 * Wakeup any pending reader
855 		 */
856 		pipesignal(wpb, PIPE_WANTR);
857 
858 		/*
859 		 * don't block on non-blocking I/O
860 		 */
861 		if (nbio) {
862 			error = EAGAIN;
863 			break;
864 		}
865 
866 #ifdef _RDTSC_SUPPORTED_
867 		if (pipe_delay) {
868 			int64_t tsc_target;
869 			int good = 0;
870 
871 			tsc_target = tsc_get_target(pipe_delay);
872 			while (tsc_test_target(tsc_target) == 0) {
873 				cpu_lfence();
874 				space = wpb->size - (wpb->windex - wpb->rindex);
875 				if ((space < uio->uio_resid) &&
876 				    (orig_resid <= PIPE_BUF)) {
877 					space = 0;
878 				}
879 				if (space) {
880 					good = 1;
881 					break;
882 				}
883 				cpu_pause();
884 			}
885 			if (good)
886 				continue;
887 		}
888 #endif
889 
890 		/*
891 		 * Interlocked test.   Atomic op enforces the memory barrier.
892 		 */
893 		tsleep_interlock(wpb, PCATCH);
894 		atomic_set_int(&wpb->state, PIPE_WANTW);
895 
896 		/*
897 		 * Retest space available after memory barrier above.
898 		 * Writes of size <= PIPE_BUF must be atomic.
899 		 */
900 		space = wpb->size - (wpb->windex - wpb->rindex);
901 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
902 			space = 0;
903 
904 		/*
905 		 * Retest EOF after memory barrier above.
906 		 */
907 		if (wpb->state & PIPE_WEOF) {
908 			error = EPIPE;
909 			break;
910 		}
911 
912 		/*
913 		 * We have no more space and have something to offer,
914 		 * wake up select/poll/kq.
915 		 */
916 		if (space == 0) {
917 			pipewakeup(wpb, 1);
918 			error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
919 		}
920 
921 		/*
922 		 * Break out if we errored or the read side wants us to go
923 		 * away.
924 		 */
925 		if (error)
926 			break;
927 		if (wpb->state & PIPE_WEOF) {
928 			error = EPIPE;
929 			break;
930 		}
931 	}
932 	pipe_end_uio(&wpb->wip);
933 
934 	/*
935 	 * If we have put any characters in the buffer, we wake up
936 	 * the reader.
937 	 *
938 	 * Both rlock and wlock are required to be able to modify pipe_state.
939 	 */
940 	if (wpb->windex != wpb->rindex) {
941 		pipesignal(wpb, PIPE_WANTR);
942 		pipewakeup(wpb, 1);
943 	}
944 
945 	/*
946 	 * Don't return EPIPE if I/O was successful
947 	 */
948 	if ((wpb->rindex == wpb->windex) &&
949 	    (uio->uio_resid == 0) &&
950 	    (error == EPIPE)) {
951 		error = 0;
952 	}
953 
954 	if (error == 0)
955 		vfs_timestamp(&wpb->mtime);
956 
957 	/*
958 	 * We have something to offer,
959 	 * wake up select/poll/kq.
960 	 */
961 	/*space = wpb->windex - wpb->rindex;*/
962 	lwkt_reltoken(&wpb->wlock);
963 
964 	return (error);
965 }
966 
967 /*
968  * we implement a very minimal set of ioctls for compatibility with sockets.
969  */
970 static int
971 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
972 	   struct ucred *cred, struct sysmsg *msg)
973 {
974 	struct pipebuf *rpb;
975 	struct pipe *pipe;
976 	int error;
977 
978 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
979 	if ((intptr_t)fp->f_data & 1) {
980 		rpb = &pipe->bufferB;
981 	} else {
982 		rpb = &pipe->bufferA;
983 	}
984 
985 	lwkt_gettoken(&rpb->rlock);
986 	lwkt_gettoken(&rpb->wlock);
987 
988 	switch (cmd) {
989 	case FIOASYNC:
990 		if (*(int *)data) {
991 			atomic_set_int(&rpb->state, PIPE_ASYNC);
992 		} else {
993 			atomic_clear_int(&rpb->state, PIPE_ASYNC);
994 		}
995 		error = 0;
996 		break;
997 	case FIONREAD:
998 		*(int *)data = (int)(rpb->windex - rpb->rindex);
999 		error = 0;
1000 		break;
1001 	case FIOSETOWN:
1002 		error = fsetown(*(int *)data, &rpb->sigio);
1003 		break;
1004 	case FIOGETOWN:
1005 		*(int *)data = fgetown(&rpb->sigio);
1006 		error = 0;
1007 		break;
1008 	case TIOCSPGRP:
1009 		/* This is deprecated, FIOSETOWN should be used instead. */
1010 		error = fsetown(-(*(int *)data), &rpb->sigio);
1011 		break;
1012 
1013 	case TIOCGPGRP:
1014 		/* This is deprecated, FIOGETOWN should be used instead. */
1015 		*(int *)data = -fgetown(&rpb->sigio);
1016 		error = 0;
1017 		break;
1018 	default:
1019 		error = ENOTTY;
1020 		break;
1021 	}
1022 	lwkt_reltoken(&rpb->wlock);
1023 	lwkt_reltoken(&rpb->rlock);
1024 
1025 	return (error);
1026 }
1027 
1028 /*
1029  * MPSAFE
1030  */
1031 static int
1032 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1033 {
1034 	struct pipebuf *rpb;
1035 	struct pipe *pipe;
1036 
1037 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1038 	if ((intptr_t)fp->f_data & 1) {
1039 		rpb = &pipe->bufferB;
1040 	} else {
1041 		rpb = &pipe->bufferA;
1042 	}
1043 
1044 	bzero((caddr_t)ub, sizeof(*ub));
1045 	ub->st_mode = S_IFIFO;
1046 	ub->st_blksize = rpb->size;
1047 	ub->st_size = rpb->windex - rpb->rindex;
1048 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1049 	ub->st_atimespec = rpb->atime;
1050 	ub->st_mtimespec = rpb->mtime;
1051 	ub->st_ctimespec = pipe->ctime;
1052 	ub->st_uid = fp->f_cred->cr_uid;
1053 	ub->st_gid = fp->f_cred->cr_gid;
1054 	ub->st_ino = pipe->inum;
1055 	/*
1056 	 * Left as 0: st_dev, st_nlink, st_rdev,
1057 	 * st_flags, st_gen.
1058 	 * XXX (st_dev, st_ino) should be unique.
1059 	 */
1060 
1061 	return (0);
1062 }
1063 
1064 static int
1065 pipe_close(struct file *fp)
1066 {
1067 	struct pipebuf *rpb;
1068 	struct pipebuf *wpb;
1069 	struct pipe *pipe;
1070 
1071 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1072 	if ((intptr_t)fp->f_data & 1) {
1073 		rpb = &pipe->bufferB;
1074 		wpb = &pipe->bufferA;
1075 	} else {
1076 		rpb = &pipe->bufferA;
1077 		wpb = &pipe->bufferB;
1078 	}
1079 
1080 	fp->f_ops = &badfileops;
1081 	fp->f_data = NULL;
1082 	funsetown(&rpb->sigio);
1083 	pipeclose(pipe, rpb, wpb);
1084 
1085 	return (0);
1086 }
1087 
1088 /*
1089  * Shutdown one or both directions of a full-duplex pipe.
1090  */
1091 static int
1092 pipe_shutdown(struct file *fp, int how)
1093 {
1094 	struct pipebuf *rpb;
1095 	struct pipebuf *wpb;
1096 	struct pipe *pipe;
1097 	int error = EPIPE;
1098 
1099 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1100 	if ((intptr_t)fp->f_data & 1) {
1101 		rpb = &pipe->bufferB;
1102 		wpb = &pipe->bufferA;
1103 	} else {
1104 		rpb = &pipe->bufferA;
1105 		wpb = &pipe->bufferB;
1106 	}
1107 
1108 	/*
1109 	 * We modify pipe_state on both pipes, which means we need
1110 	 * all four tokens!
1111 	 */
1112 	lwkt_gettoken(&rpb->rlock);
1113 	lwkt_gettoken(&rpb->wlock);
1114 	lwkt_gettoken(&wpb->rlock);
1115 	lwkt_gettoken(&wpb->wlock);
1116 
1117 	switch(how) {
1118 	case SHUT_RDWR:
1119 	case SHUT_RD:
1120 		/*
1121 		 * EOF on my reads and peer writes
1122 		 */
1123 		atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1124 		if (rpb->state & PIPE_WANTR) {
1125 			rpb->state &= ~PIPE_WANTR;
1126 			wakeup(rpb);
1127 		}
1128 		if (rpb->state & PIPE_WANTW) {
1129 			rpb->state &= ~PIPE_WANTW;
1130 			wakeup(rpb);
1131 		}
1132 		error = 0;
1133 		if (how == SHUT_RD)
1134 			break;
1135 		/* fall through */
1136 	case SHUT_WR:
1137 		/*
1138 		 * EOF on peer reads and my writes
1139 		 */
1140 		atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1141 		if (wpb->state & PIPE_WANTR) {
1142 			wpb->state &= ~PIPE_WANTR;
1143 			wakeup(wpb);
1144 		}
1145 		if (wpb->state & PIPE_WANTW) {
1146 			wpb->state &= ~PIPE_WANTW;
1147 			wakeup(wpb);
1148 		}
1149 		error = 0;
1150 		break;
1151 	}
1152 	pipewakeup(rpb, 1);
1153 	pipewakeup(wpb, 1);
1154 
1155 	lwkt_reltoken(&wpb->wlock);
1156 	lwkt_reltoken(&wpb->rlock);
1157 	lwkt_reltoken(&rpb->wlock);
1158 	lwkt_reltoken(&rpb->rlock);
1159 
1160 	return (error);
1161 }
1162 
1163 /*
1164  * Destroy the pipe buffer.
1165  */
1166 static void
1167 pipe_free_kmem(struct pipebuf *pb)
1168 {
1169 	if (pb->buffer != NULL) {
1170 		kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1171 		pb->buffer = NULL;
1172 		pb->object = NULL;
1173 	}
1174 }
1175 
1176 /*
1177  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1178  * and writing on wpb.  This routine must be called twice with the pipebufs
1179  * reversed to close both directions.
1180  */
1181 static void
1182 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1183 {
1184 	globaldata_t gd;
1185 
1186 	if (pipe == NULL)
1187 		return;
1188 
1189 	/*
1190 	 * We need both the read and write tokens to modify pipe_state.
1191 	 */
1192 	lwkt_gettoken(&rpb->rlock);
1193 	lwkt_gettoken(&rpb->wlock);
1194 
1195 	/*
1196 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1197 	 * wakeup anyone blocked on our pipe.  No action if our side
1198 	 * is already closed.
1199 	 */
1200 	if (rpb->state & PIPE_CLOSED) {
1201 		lwkt_reltoken(&rpb->wlock);
1202 		lwkt_reltoken(&rpb->rlock);
1203 		return;
1204 	}
1205 
1206 	atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1207 	pipewakeup(rpb, 1);
1208 	if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1209 		rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1210 		wakeup(rpb);
1211 	}
1212 	lwkt_reltoken(&rpb->wlock);
1213 	lwkt_reltoken(&rpb->rlock);
1214 
1215 	/*
1216 	 * Disconnect from peer.
1217 	 */
1218 	lwkt_gettoken(&wpb->rlock);
1219 	lwkt_gettoken(&wpb->wlock);
1220 
1221 	atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1222 	pipewakeup(wpb, 1);
1223 	if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224 		wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225 		wakeup(wpb);
1226 	}
1227 	if (SLIST_FIRST(&wpb->kq.ki_note))
1228 		KNOTE(&wpb->kq.ki_note, 0);
1229 	lwkt_reltoken(&wpb->wlock);
1230 	lwkt_reltoken(&wpb->rlock);
1231 
1232 	/*
1233 	 * Free resources once both sides are closed.  We maintain a pcpu
1234 	 * cache to improve performance, so the actual tear-down case is
1235 	 * limited to bulk situations.
1236 	 *
1237 	 * However, the bulk tear-down case can cause intense contention
1238 	 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1239 	 * of processes are killed at the same time.  To deal with this we
1240 	 * use a pcpu mutex to maintain concurrency but also limit the
1241 	 * number of threads banging on the map and pmap.
1242 	 *
1243 	 * We use the mtx mechanism instead of the lockmgr mechanism because
1244 	 * the mtx mechanism utilizes a queued design which will not break
1245 	 * down in the face of thousands to hundreds of thousands of
1246 	 * processes trying to free pipes simultaneously.  The lockmgr
1247 	 * mechanism will wind up waking them all up each time a lock
1248 	 * cycles.
1249 	 */
1250 	if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1251 		gd = mycpu;
1252 		if (gd->gd_pipeqcount >= pipe_maxcache) {
1253 			mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1254 			pipe_free_kmem(rpb);
1255 			pipe_free_kmem(wpb);
1256 			mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1257 			kfree(pipe, M_PIPE);
1258 		} else {
1259 			rpb->state = 0;
1260 			wpb->state = 0;
1261 			pipe->next = gd->gd_pipeq;
1262 			gd->gd_pipeq = pipe;
1263 			++gd->gd_pipeqcount;
1264 		}
1265 	}
1266 }
1267 
1268 static int
1269 pipe_kqfilter(struct file *fp, struct knote *kn)
1270 {
1271 	struct pipebuf *rpb;
1272 	struct pipebuf *wpb;
1273 	struct pipe *pipe;
1274 
1275 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1276 	if ((intptr_t)fp->f_data & 1) {
1277 		rpb = &pipe->bufferB;
1278 		wpb = &pipe->bufferA;
1279 	} else {
1280 		rpb = &pipe->bufferA;
1281 		wpb = &pipe->bufferB;
1282 	}
1283 
1284 	switch (kn->kn_filter) {
1285 	case EVFILT_READ:
1286 		kn->kn_fop = &pipe_rfiltops;
1287 		break;
1288 	case EVFILT_WRITE:
1289 		kn->kn_fop = &pipe_wfiltops;
1290 		if (wpb->state & PIPE_CLOSED) {
1291 			/* other end of pipe has been closed */
1292 			return (EPIPE);
1293 		}
1294 		break;
1295 	default:
1296 		return (EOPNOTSUPP);
1297 	}
1298 
1299 	if (rpb == &pipe->bufferA)
1300 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1301 	else
1302 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1303 
1304 	knote_insert(&rpb->kq.ki_note, kn);
1305 
1306 	return (0);
1307 }
1308 
1309 static void
1310 filt_pipedetach(struct knote *kn)
1311 {
1312 	struct pipebuf *rpb;
1313 	struct pipebuf *wpb;
1314 	struct pipe *pipe;
1315 
1316 	pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1317 	if ((intptr_t)kn->kn_hook & 1) {
1318 		rpb = &pipe->bufferB;
1319 		wpb = &pipe->bufferA;
1320 	} else {
1321 		rpb = &pipe->bufferA;
1322 		wpb = &pipe->bufferB;
1323 	}
1324 	knote_remove(&rpb->kq.ki_note, kn);
1325 }
1326 
1327 /*ARGSUSED*/
1328 static int
1329 filt_piperead(struct knote *kn, long hint)
1330 {
1331 	struct pipebuf *rpb;
1332 	struct pipebuf *wpb;
1333 	struct pipe *pipe;
1334 	int ready = 0;
1335 
1336 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1337 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1338 		rpb = &pipe->bufferB;
1339 		wpb = &pipe->bufferA;
1340 	} else {
1341 		rpb = &pipe->bufferA;
1342 		wpb = &pipe->bufferB;
1343 	}
1344 
1345 	lwkt_gettoken(&rpb->rlock);
1346 	lwkt_gettoken(&rpb->wlock);
1347 
1348 	kn->kn_data = rpb->windex - rpb->rindex;
1349 
1350 	if (rpb->state & PIPE_REOF) {
1351 		/*
1352 		 * Only set NODATA if all data has been exhausted
1353 		 */
1354 		if (kn->kn_data == 0)
1355 			kn->kn_flags |= EV_NODATA;
1356 		kn->kn_flags |= EV_EOF;
1357 		ready = 1;
1358 	}
1359 
1360 	lwkt_reltoken(&rpb->wlock);
1361 	lwkt_reltoken(&rpb->rlock);
1362 
1363 	if (!ready)
1364 		ready = kn->kn_data > 0;
1365 
1366 	return (ready);
1367 }
1368 
1369 /*ARGSUSED*/
1370 static int
1371 filt_pipewrite(struct knote *kn, long hint)
1372 {
1373 	struct pipebuf *rpb;
1374 	struct pipebuf *wpb;
1375 	struct pipe *pipe;
1376 	int ready = 0;
1377 
1378 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1379 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1380 		rpb = &pipe->bufferB;
1381 		wpb = &pipe->bufferA;
1382 	} else {
1383 		rpb = &pipe->bufferA;
1384 		wpb = &pipe->bufferB;
1385 	}
1386 
1387 	kn->kn_data = 0;
1388 	if (wpb->state & PIPE_CLOSED) {
1389 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1390 		return (1);
1391 	}
1392 
1393 	lwkt_gettoken(&wpb->rlock);
1394 	lwkt_gettoken(&wpb->wlock);
1395 
1396 	if (wpb->state & PIPE_WEOF) {
1397 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1398 		ready = 1;
1399 	}
1400 
1401 	if (!ready)
1402 		kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1403 
1404 	lwkt_reltoken(&wpb->wlock);
1405 	lwkt_reltoken(&wpb->rlock);
1406 
1407 	if (!ready)
1408 		ready = kn->kn_data >= PIPE_BUF;
1409 
1410 	return (ready);
1411 }
1412