xref: /dragonfly/sys/kern/sys_pipe.c (revision fb151170)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22 
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61 
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64 
65 #include <machine/cpufunc.h>
66 
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio,
71 		struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio,
73 		struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79 		struct ucred *cred, struct sysmsg *msg);
80 
81 static struct fileops pipeops = {
82 	.fo_read = pipe_read,
83 	.fo_write = pipe_write,
84 	.fo_ioctl = pipe_ioctl,
85 	.fo_kqfilter = pipe_kqfilter,
86 	.fo_stat = pipe_stat,
87 	.fo_close = pipe_close,
88 	.fo_shutdown = pipe_shutdown
89 };
90 
91 static void	filt_pipedetach(struct knote *kn);
92 static int	filt_piperead(struct knote *kn, long hint);
93 static int	filt_pipewrite(struct knote *kn, long hint);
94 
95 static struct filterops pipe_rfiltops =
96 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99 
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101 
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110 
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES	64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116 
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125 
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;	/* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150 
151 /*
152  * Auto-size pipe cache to reduce kmem allocations and frees.
153  */
154 static
155 void
156 pipeinit(void *dummy)
157 {
158 	size_t mbytes = kmem_lim_size();
159 
160 	if (pipe_maxbig == LIMITBIGPIPES) {
161 		if (mbytes >= 7 * 1024)
162 			pipe_maxbig *= 2;
163 		if (mbytes >= 15 * 1024)
164 			pipe_maxbig *= 2;
165 	}
166 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
167 		if (mbytes >= 7 * 1024)
168 			pipe_maxcache *= 2;
169 		if (mbytes >= 15 * 1024)
170 			pipe_maxcache *= 2;
171 	}
172 }
173 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL)
174 
175 static void pipeclose (struct pipe *cpipe);
176 static void pipe_free_kmem (struct pipe *cpipe);
177 static int pipe_create (struct pipe **cpipep);
178 static int pipespace (struct pipe *cpipe, int size);
179 
180 static __inline void
181 pipewakeup(struct pipe *cpipe, int dosigio)
182 {
183 	if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
184 		lwkt_gettoken(&proc_token);
185 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
186 		lwkt_reltoken(&proc_token);
187 	}
188 	KNOTE(&cpipe->pipe_kq.ki_note, 0);
189 }
190 
191 /*
192  * These routines are called before and after a UIO.  The UIO
193  * may block, causing our held tokens to be lost temporarily.
194  *
195  * We use these routines to serialize reads against other reads
196  * and writes against other writes.
197  *
198  * The read token is held on entry so *ipp does not race.
199  */
200 static __inline int
201 pipe_start_uio(struct pipe *cpipe, int *ipp)
202 {
203 	int error;
204 
205 	while (*ipp) {
206 		*ipp = -1;
207 		error = tsleep(ipp, PCATCH, "pipexx", 0);
208 		if (error)
209 			return (error);
210 	}
211 	*ipp = 1;
212 	return (0);
213 }
214 
215 static __inline void
216 pipe_end_uio(struct pipe *cpipe, int *ipp)
217 {
218 	if (*ipp < 0) {
219 		*ipp = 0;
220 		wakeup(ipp);
221 	} else {
222 		KKASSERT(*ipp > 0);
223 		*ipp = 0;
224 	}
225 }
226 
227 /*
228  * The pipe system call for the DTYPE_PIPE type of pipes
229  *
230  * pipe_args(int dummy)
231  *
232  * MPSAFE
233  */
234 int
235 sys_pipe(struct pipe_args *uap)
236 {
237 	struct thread *td = curthread;
238 	struct filedesc *fdp = td->td_proc->p_fd;
239 	struct file *rf, *wf;
240 	struct pipe *rpipe, *wpipe;
241 	int fd1, fd2, error;
242 
243 	rpipe = wpipe = NULL;
244 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
245 		pipeclose(rpipe);
246 		pipeclose(wpipe);
247 		return (ENFILE);
248 	}
249 
250 	error = falloc(td->td_lwp, &rf, &fd1);
251 	if (error) {
252 		pipeclose(rpipe);
253 		pipeclose(wpipe);
254 		return (error);
255 	}
256 	uap->sysmsg_fds[0] = fd1;
257 
258 	/*
259 	 * Warning: once we've gotten past allocation of the fd for the
260 	 * read-side, we can only drop the read side via fdrop() in order
261 	 * to avoid races against processes which manage to dup() the read
262 	 * side while we are blocked trying to allocate the write side.
263 	 */
264 	rf->f_type = DTYPE_PIPE;
265 	rf->f_flag = FREAD | FWRITE;
266 	rf->f_ops = &pipeops;
267 	rf->f_data = rpipe;
268 	error = falloc(td->td_lwp, &wf, &fd2);
269 	if (error) {
270 		fsetfd(fdp, NULL, fd1);
271 		fdrop(rf);
272 		/* rpipe has been closed by fdrop(). */
273 		pipeclose(wpipe);
274 		return (error);
275 	}
276 	wf->f_type = DTYPE_PIPE;
277 	wf->f_flag = FREAD | FWRITE;
278 	wf->f_ops = &pipeops;
279 	wf->f_data = wpipe;
280 	uap->sysmsg_fds[1] = fd2;
281 
282 	rpipe->pipe_slock = kmalloc(sizeof(struct lock),
283 				    M_PIPE, M_WAITOK|M_ZERO);
284 	wpipe->pipe_slock = rpipe->pipe_slock;
285 	rpipe->pipe_peer = wpipe;
286 	wpipe->pipe_peer = rpipe;
287 	lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
288 
289 	/*
290 	 * Once activated the peer relationship remains valid until
291 	 * both sides are closed.
292 	 */
293 	fsetfd(fdp, rf, fd1);
294 	fsetfd(fdp, wf, fd2);
295 	fdrop(rf);
296 	fdrop(wf);
297 
298 	return (0);
299 }
300 
301 /*
302  * Allocate kva for pipe circular buffer, the space is pageable
303  * This routine will 'realloc' the size of a pipe safely, if it fails
304  * it will retain the old buffer.
305  * If it fails it will return ENOMEM.
306  */
307 static int
308 pipespace(struct pipe *cpipe, int size)
309 {
310 	struct vm_object *object;
311 	caddr_t buffer;
312 	int npages, error;
313 
314 	npages = round_page(size) / PAGE_SIZE;
315 	object = cpipe->pipe_buffer.object;
316 
317 	/*
318 	 * [re]create the object if necessary and reserve space for it
319 	 * in the kernel_map.  The object and memory are pageable.  On
320 	 * success, free the old resources before assigning the new
321 	 * ones.
322 	 */
323 	if (object == NULL || object->size != npages) {
324 		object = vm_object_allocate(OBJT_DEFAULT, npages);
325 		buffer = (caddr_t)vm_map_min(&kernel_map);
326 
327 		error = vm_map_find(&kernel_map, object, 0,
328 				    (vm_offset_t *)&buffer,
329 				    size, PAGE_SIZE,
330 				    1, VM_MAPTYPE_NORMAL,
331 				    VM_PROT_ALL, VM_PROT_ALL,
332 				    0);
333 
334 		if (error != KERN_SUCCESS) {
335 			vm_object_deallocate(object);
336 			return (ENOMEM);
337 		}
338 		pipe_free_kmem(cpipe);
339 		cpipe->pipe_buffer.object = object;
340 		cpipe->pipe_buffer.buffer = buffer;
341 		cpipe->pipe_buffer.size = size;
342 		++pipe_bkmem_alloc;
343 	} else {
344 		++pipe_bcache_alloc;
345 	}
346 	cpipe->pipe_buffer.rindex = 0;
347 	cpipe->pipe_buffer.windex = 0;
348 	return (0);
349 }
350 
351 /*
352  * Initialize and allocate VM and memory for pipe, pulling the pipe from
353  * our per-cpu cache if possible.  For now make sure it is sized for the
354  * smaller PIPE_SIZE default.
355  */
356 static int
357 pipe_create(struct pipe **cpipep)
358 {
359 	globaldata_t gd = mycpu;
360 	struct pipe *cpipe;
361 	int error;
362 
363 	if ((cpipe = gd->gd_pipeq) != NULL) {
364 		gd->gd_pipeq = cpipe->pipe_peer;
365 		--gd->gd_pipeqcount;
366 		cpipe->pipe_peer = NULL;
367 		cpipe->pipe_wantwcnt = 0;
368 	} else {
369 		cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
370 	}
371 	*cpipep = cpipe;
372 	if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
373 		return (error);
374 	vfs_timestamp(&cpipe->pipe_ctime);
375 	cpipe->pipe_atime = cpipe->pipe_ctime;
376 	cpipe->pipe_mtime = cpipe->pipe_ctime;
377 	lwkt_token_init(&cpipe->pipe_rlock, "piper");
378 	lwkt_token_init(&cpipe->pipe_wlock, "pipew");
379 	return (0);
380 }
381 
382 static int
383 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
384 {
385 	struct pipe *rpipe;
386 	struct pipe *wpipe;
387 	int error;
388 	size_t nread = 0;
389 	int nbio;
390 	u_int size;	/* total bytes available */
391 	u_int nsize;	/* total bytes to read */
392 	u_int rindex;	/* contiguous bytes available */
393 	int notify_writer;
394 	int bigread;
395 	int bigcount;
396 
397 	if (uio->uio_resid == 0)
398 		return(0);
399 
400 	/*
401 	 * Setup locks, calculate nbio
402 	 */
403 	rpipe = (struct pipe *)fp->f_data;
404 	wpipe = rpipe->pipe_peer;
405 	lwkt_gettoken(&rpipe->pipe_rlock);
406 
407 	if (fflags & O_FBLOCKING)
408 		nbio = 0;
409 	else if (fflags & O_FNONBLOCKING)
410 		nbio = 1;
411 	else if (fp->f_flag & O_NONBLOCK)
412 		nbio = 1;
413 	else
414 		nbio = 0;
415 
416 	/*
417 	 * Reads are serialized.  Note however that pipe_buffer.buffer and
418 	 * pipe_buffer.size can change out from under us when the number
419 	 * of bytes in the buffer are zero due to the write-side doing a
420 	 * pipespace().
421 	 */
422 	error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
423 	if (error) {
424 		lwkt_reltoken(&rpipe->pipe_rlock);
425 		return (error);
426 	}
427 	notify_writer = 0;
428 
429 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
430 	bigcount = 10;
431 
432 	while (uio->uio_resid) {
433 		/*
434 		 * Don't hog the cpu.
435 		 */
436 		if (bigread && --bigcount == 0) {
437 			lwkt_user_yield();
438 			bigcount = 10;
439 			if (CURSIG(curthread->td_lwp)) {
440 				error = EINTR;
441 				break;
442 			}
443 		}
444 
445 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
446 		cpu_lfence();
447 		if (size) {
448 			rindex = rpipe->pipe_buffer.rindex &
449 				 (rpipe->pipe_buffer.size - 1);
450 			nsize = size;
451 			if (nsize > rpipe->pipe_buffer.size - rindex)
452 				nsize = rpipe->pipe_buffer.size - rindex;
453 			nsize = szmin(nsize, uio->uio_resid);
454 
455 			error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
456 					nsize, uio);
457 			if (error)
458 				break;
459 			cpu_mfence();
460 			rpipe->pipe_buffer.rindex += nsize;
461 			nread += nsize;
462 
463 			/*
464 			 * If the FIFO is still over half full just continue
465 			 * and do not try to notify the writer yet.
466 			 */
467 			if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
468 				notify_writer = 0;
469 				continue;
470 			}
471 
472 			/*
473 			 * When the FIFO is less then half full notify any
474 			 * waiting writer.  WANTW can be checked while
475 			 * holding just the rlock.
476 			 */
477 			notify_writer = 1;
478 			if ((rpipe->pipe_state & PIPE_WANTW) == 0)
479 				continue;
480 		}
481 
482 		/*
483 		 * If the "write-side" was blocked we wake it up.  This code
484 		 * is reached either when the buffer is completely emptied
485 		 * or if it becomes more then half-empty.
486 		 *
487 		 * Pipe_state can only be modified if both the rlock and
488 		 * wlock are held.
489 		 */
490 		if (rpipe->pipe_state & PIPE_WANTW) {
491 			lwkt_gettoken(&rpipe->pipe_wlock);
492 			if (rpipe->pipe_state & PIPE_WANTW) {
493 				rpipe->pipe_state &= ~PIPE_WANTW;
494 				lwkt_reltoken(&rpipe->pipe_wlock);
495 				wakeup(rpipe);
496 			} else {
497 				lwkt_reltoken(&rpipe->pipe_wlock);
498 			}
499 		}
500 
501 		/*
502 		 * Pick up our copy loop again if the writer sent data to
503 		 * us while we were messing around.
504 		 *
505 		 * On a SMP box poll up to pipe_delay nanoseconds for new
506 		 * data.  Typically a value of 2000 to 4000 is sufficient
507 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
508 		 * is used for synchronous communications with small packets,
509 		 * and 8000 or so (8uS) will pipeline large buffer xfers
510 		 * between cpus over a pipe.
511 		 *
512 		 * For synchronous communications a hit means doing a
513 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
514 		 * where as miss requiring a tsleep/wakeup sequence
515 		 * will take 7uS or more.
516 		 */
517 		if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
518 			continue;
519 
520 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
521 		if (pipe_delay) {
522 			int64_t tsc_target;
523 			int good = 0;
524 
525 			tsc_target = tsc_get_target(pipe_delay);
526 			while (tsc_test_target(tsc_target) == 0) {
527 				if (rpipe->pipe_buffer.windex !=
528 				    rpipe->pipe_buffer.rindex) {
529 					good = 1;
530 					break;
531 				}
532 			}
533 			if (good)
534 				continue;
535 		}
536 #endif
537 
538 		/*
539 		 * Detect EOF condition, do not set error.
540 		 */
541 		if (rpipe->pipe_state & PIPE_REOF)
542 			break;
543 
544 		/*
545 		 * Break if some data was read, or if this was a non-blocking
546 		 * read.
547 		 */
548 		if (nread > 0)
549 			break;
550 
551 		if (nbio) {
552 			error = EAGAIN;
553 			break;
554 		}
555 
556 		/*
557 		 * Last chance, interlock with WANTR.
558 		 */
559 		lwkt_gettoken(&rpipe->pipe_wlock);
560 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
561 		if (size) {
562 			lwkt_reltoken(&rpipe->pipe_wlock);
563 			continue;
564 		}
565 
566 		/*
567 		 * Retest EOF - acquiring a new token can temporarily release
568 		 * tokens already held.
569 		 */
570 		if (rpipe->pipe_state & PIPE_REOF) {
571 			lwkt_reltoken(&rpipe->pipe_wlock);
572 			break;
573 		}
574 
575 		/*
576 		 * If there is no more to read in the pipe, reset its
577 		 * pointers to the beginning.  This improves cache hit
578 		 * stats.
579 		 *
580 		 * We need both locks to modify both pointers, and there
581 		 * must also not be a write in progress or the uiomove()
582 		 * in the write might block and temporarily release
583 		 * its wlock, then reacquire and update windex.  We are
584 		 * only serialized against reads, not writes.
585 		 *
586 		 * XXX should we even bother resetting the indices?  It
587 		 *     might actually be more cache efficient not to.
588 		 */
589 		if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
590 		    rpipe->pipe_wip == 0) {
591 			rpipe->pipe_buffer.rindex = 0;
592 			rpipe->pipe_buffer.windex = 0;
593 		}
594 
595 		/*
596 		 * Wait for more data.
597 		 *
598 		 * Pipe_state can only be set if both the rlock and wlock
599 		 * are held.
600 		 */
601 		rpipe->pipe_state |= PIPE_WANTR;
602 		tsleep_interlock(rpipe, PCATCH);
603 		lwkt_reltoken(&rpipe->pipe_wlock);
604 		error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
605 		++pipe_rblocked_count;
606 		if (error)
607 			break;
608 	}
609 	pipe_end_uio(rpipe, &rpipe->pipe_rip);
610 
611 	/*
612 	 * Uptime last access time
613 	 */
614 	if (error == 0 && nread)
615 		vfs_timestamp(&rpipe->pipe_atime);
616 
617 	/*
618 	 * If we drained the FIFO more then half way then handle
619 	 * write blocking hysteresis.
620 	 *
621 	 * Note that PIPE_WANTW cannot be set by the writer without
622 	 * it holding both rlock and wlock, so we can test it
623 	 * while holding just rlock.
624 	 */
625 	if (notify_writer) {
626 		/*
627 		 * Synchronous blocking is done on the pipe involved
628 		 */
629 		if (rpipe->pipe_state & PIPE_WANTW) {
630 			lwkt_gettoken(&rpipe->pipe_wlock);
631 			if (rpipe->pipe_state & PIPE_WANTW) {
632 				rpipe->pipe_state &= ~PIPE_WANTW;
633 				lwkt_reltoken(&rpipe->pipe_wlock);
634 				wakeup(rpipe);
635 			} else {
636 				lwkt_reltoken(&rpipe->pipe_wlock);
637 			}
638 		}
639 
640 		/*
641 		 * But we may also have to deal with a kqueue which is
642 		 * stored on the same pipe as its descriptor, so a
643 		 * EVFILT_WRITE event waiting for our side to drain will
644 		 * be on the other side.
645 		 */
646 		lwkt_gettoken(&wpipe->pipe_wlock);
647 		pipewakeup(wpipe, 0);
648 		lwkt_reltoken(&wpipe->pipe_wlock);
649 	}
650 	/*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
651 	lwkt_reltoken(&rpipe->pipe_rlock);
652 
653 	return (error);
654 }
655 
656 static int
657 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
658 {
659 	int error;
660 	int orig_resid;
661 	int nbio;
662 	struct pipe *wpipe;
663 	struct pipe *rpipe;
664 	u_int windex;
665 	u_int space;
666 	u_int wcount;
667 	int bigwrite;
668 	int bigcount;
669 
670 	/*
671 	 * Writes go to the peer.  The peer will always exist.
672 	 */
673 	rpipe = (struct pipe *) fp->f_data;
674 	wpipe = rpipe->pipe_peer;
675 	lwkt_gettoken(&wpipe->pipe_wlock);
676 	if (wpipe->pipe_state & PIPE_WEOF) {
677 		lwkt_reltoken(&wpipe->pipe_wlock);
678 		return (EPIPE);
679 	}
680 
681 	/*
682 	 * Degenerate case (EPIPE takes prec)
683 	 */
684 	if (uio->uio_resid == 0) {
685 		lwkt_reltoken(&wpipe->pipe_wlock);
686 		return(0);
687 	}
688 
689 	/*
690 	 * Writes are serialized (start_uio must be called with wlock)
691 	 */
692 	error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
693 	if (error) {
694 		lwkt_reltoken(&wpipe->pipe_wlock);
695 		return (error);
696 	}
697 
698 	if (fflags & O_FBLOCKING)
699 		nbio = 0;
700 	else if (fflags & O_FNONBLOCKING)
701 		nbio = 1;
702 	else if (fp->f_flag & O_NONBLOCK)
703 		nbio = 1;
704 	else
705 		nbio = 0;
706 
707 	/*
708 	 * If it is advantageous to resize the pipe buffer, do
709 	 * so.  We are write-serialized so we can block safely.
710 	 */
711 	if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
712 	    (pipe_nbig < pipe_maxbig) &&
713 	    wpipe->pipe_wantwcnt > 4 &&
714 	    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
715 		/*
716 		 * Recheck after lock.
717 		 */
718 		lwkt_gettoken(&wpipe->pipe_rlock);
719 		if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
720 		    (pipe_nbig < pipe_maxbig) &&
721 		    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
722 			atomic_add_int(&pipe_nbig, 1);
723 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
724 				++pipe_bigcount;
725 			else
726 				atomic_subtract_int(&pipe_nbig, 1);
727 		}
728 		lwkt_reltoken(&wpipe->pipe_rlock);
729 	}
730 
731 	orig_resid = uio->uio_resid;
732 	wcount = 0;
733 
734 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
735 	bigcount = 10;
736 
737 	while (uio->uio_resid) {
738 		if (wpipe->pipe_state & PIPE_WEOF) {
739 			error = EPIPE;
740 			break;
741 		}
742 
743 		/*
744 		 * Don't hog the cpu.
745 		 */
746 		if (bigwrite && --bigcount == 0) {
747 			lwkt_user_yield();
748 			bigcount = 10;
749 			if (CURSIG(curthread->td_lwp)) {
750 				error = EINTR;
751 				break;
752 			}
753 		}
754 
755 		windex = wpipe->pipe_buffer.windex &
756 			 (wpipe->pipe_buffer.size - 1);
757 		space = wpipe->pipe_buffer.size -
758 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
759 		cpu_lfence();
760 
761 		/* Writes of size <= PIPE_BUF must be atomic. */
762 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
763 			space = 0;
764 
765 		/*
766 		 * Write to fill, read size handles write hysteresis.  Also
767 		 * additional restrictions can cause select-based non-blocking
768 		 * writes to spin.
769 		 */
770 		if (space > 0) {
771 			u_int segsize;
772 
773 			/*
774 			 * Transfer size is minimum of uio transfer
775 			 * and free space in pipe buffer.
776 			 *
777 			 * Limit each uiocopy to no more then PIPE_SIZE
778 			 * so we can keep the gravy train going on a
779 			 * SMP box.  This doubles the performance for
780 			 * write sizes > 16K.  Otherwise large writes
781 			 * wind up doing an inefficient synchronous
782 			 * ping-pong.
783 			 */
784 			space = szmin(space, uio->uio_resid);
785 			if (space > PIPE_SIZE)
786 				space = PIPE_SIZE;
787 
788 			/*
789 			 * First segment to transfer is minimum of
790 			 * transfer size and contiguous space in
791 			 * pipe buffer.  If first segment to transfer
792 			 * is less than the transfer size, we've got
793 			 * a wraparound in the buffer.
794 			 */
795 			segsize = wpipe->pipe_buffer.size - windex;
796 			if (segsize > space)
797 				segsize = space;
798 
799 #ifdef SMP
800 			/*
801 			 * If this is the first loop and the reader is
802 			 * blocked, do a preemptive wakeup of the reader.
803 			 *
804 			 * On SMP the IPI latency plus the wlock interlock
805 			 * on the reader side is the fastest way to get the
806 			 * reader going.  (The scheduler will hard loop on
807 			 * lock tokens).
808 			 *
809 			 * NOTE: We can't clear WANTR here without acquiring
810 			 * the rlock, which we don't want to do here!
811 			 */
812 			if ((wpipe->pipe_state & PIPE_WANTR))
813 				wakeup(wpipe);
814 #endif
815 
816 			/*
817 			 * Transfer segment, which may include a wrap-around.
818 			 * Update windex to account for both all in one go
819 			 * so the reader can read() the data atomically.
820 			 */
821 			error = uiomove(&wpipe->pipe_buffer.buffer[windex],
822 					segsize, uio);
823 			if (error == 0 && segsize < space) {
824 				segsize = space - segsize;
825 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
826 						segsize, uio);
827 			}
828 			if (error)
829 				break;
830 			cpu_mfence();
831 			wpipe->pipe_buffer.windex += space;
832 			wcount += space;
833 			continue;
834 		}
835 
836 		/*
837 		 * We need both the rlock and the wlock to interlock against
838 		 * the EOF, WANTW, and size checks, and to modify pipe_state.
839 		 *
840 		 * These are token locks so we do not have to worry about
841 		 * deadlocks.
842 		 */
843 		lwkt_gettoken(&wpipe->pipe_rlock);
844 
845 		/*
846 		 * If the "read-side" has been blocked, wake it up now
847 		 * and yield to let it drain synchronously rather
848 		 * then block.
849 		 */
850 		if (wpipe->pipe_state & PIPE_WANTR) {
851 			wpipe->pipe_state &= ~PIPE_WANTR;
852 			wakeup(wpipe);
853 		}
854 
855 		/*
856 		 * don't block on non-blocking I/O
857 		 */
858 		if (nbio) {
859 			lwkt_reltoken(&wpipe->pipe_rlock);
860 			error = EAGAIN;
861 			break;
862 		}
863 
864 		/*
865 		 * re-test whether we have to block in the writer after
866 		 * acquiring both locks, in case the reader opened up
867 		 * some space.
868 		 */
869 		space = wpipe->pipe_buffer.size -
870 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
871 		cpu_lfence();
872 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
873 			space = 0;
874 
875 		/*
876 		 * Retest EOF - acquiring a new token can temporarily release
877 		 * tokens already held.
878 		 */
879 		if (wpipe->pipe_state & PIPE_WEOF) {
880 			lwkt_reltoken(&wpipe->pipe_rlock);
881 			error = EPIPE;
882 			break;
883 		}
884 
885 		/*
886 		 * We have no more space and have something to offer,
887 		 * wake up select/poll/kq.
888 		 */
889 		if (space == 0) {
890 			wpipe->pipe_state |= PIPE_WANTW;
891 			++wpipe->pipe_wantwcnt;
892 			pipewakeup(wpipe, 1);
893 			if (wpipe->pipe_state & PIPE_WANTW)
894 				error = tsleep(wpipe, PCATCH, "pipewr", 0);
895 			++pipe_wblocked_count;
896 		}
897 		lwkt_reltoken(&wpipe->pipe_rlock);
898 
899 		/*
900 		 * Break out if we errored or the read side wants us to go
901 		 * away.
902 		 */
903 		if (error)
904 			break;
905 		if (wpipe->pipe_state & PIPE_WEOF) {
906 			error = EPIPE;
907 			break;
908 		}
909 	}
910 	pipe_end_uio(wpipe, &wpipe->pipe_wip);
911 
912 	/*
913 	 * If we have put any characters in the buffer, we wake up
914 	 * the reader.
915 	 *
916 	 * Both rlock and wlock are required to be able to modify pipe_state.
917 	 */
918 	if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
919 		if (wpipe->pipe_state & PIPE_WANTR) {
920 			lwkt_gettoken(&wpipe->pipe_rlock);
921 			if (wpipe->pipe_state & PIPE_WANTR) {
922 				wpipe->pipe_state &= ~PIPE_WANTR;
923 				lwkt_reltoken(&wpipe->pipe_rlock);
924 				wakeup(wpipe);
925 			} else {
926 				lwkt_reltoken(&wpipe->pipe_rlock);
927 			}
928 		}
929 		lwkt_gettoken(&wpipe->pipe_rlock);
930 		pipewakeup(wpipe, 1);
931 		lwkt_reltoken(&wpipe->pipe_rlock);
932 	}
933 
934 	/*
935 	 * Don't return EPIPE if I/O was successful
936 	 */
937 	if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
938 	    (uio->uio_resid == 0) &&
939 	    (error == EPIPE)) {
940 		error = 0;
941 	}
942 
943 	if (error == 0)
944 		vfs_timestamp(&wpipe->pipe_mtime);
945 
946 	/*
947 	 * We have something to offer,
948 	 * wake up select/poll/kq.
949 	 */
950 	/*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
951 	lwkt_reltoken(&wpipe->pipe_wlock);
952 	return (error);
953 }
954 
955 /*
956  * we implement a very minimal set of ioctls for compatibility with sockets.
957  */
958 int
959 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
960 	   struct ucred *cred, struct sysmsg *msg)
961 {
962 	struct pipe *mpipe;
963 	int error;
964 
965 	mpipe = (struct pipe *)fp->f_data;
966 
967 	lwkt_gettoken(&mpipe->pipe_rlock);
968 	lwkt_gettoken(&mpipe->pipe_wlock);
969 
970 	switch (cmd) {
971 	case FIOASYNC:
972 		if (*(int *)data) {
973 			mpipe->pipe_state |= PIPE_ASYNC;
974 		} else {
975 			mpipe->pipe_state &= ~PIPE_ASYNC;
976 		}
977 		error = 0;
978 		break;
979 	case FIONREAD:
980 		*(int *)data = mpipe->pipe_buffer.windex -
981 				mpipe->pipe_buffer.rindex;
982 		error = 0;
983 		break;
984 	case FIOSETOWN:
985 		error = fsetown(*(int *)data, &mpipe->pipe_sigio);
986 		break;
987 	case FIOGETOWN:
988 		*(int *)data = fgetown(&mpipe->pipe_sigio);
989 		error = 0;
990 		break;
991 	case TIOCSPGRP:
992 		/* This is deprecated, FIOSETOWN should be used instead. */
993 		error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
994 		break;
995 
996 	case TIOCGPGRP:
997 		/* This is deprecated, FIOGETOWN should be used instead. */
998 		*(int *)data = -fgetown(&mpipe->pipe_sigio);
999 		error = 0;
1000 		break;
1001 	default:
1002 		error = ENOTTY;
1003 		break;
1004 	}
1005 	lwkt_reltoken(&mpipe->pipe_wlock);
1006 	lwkt_reltoken(&mpipe->pipe_rlock);
1007 
1008 	return (error);
1009 }
1010 
1011 /*
1012  * MPSAFE
1013  */
1014 static int
1015 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1016 {
1017 	struct pipe *pipe;
1018 
1019 	pipe = (struct pipe *)fp->f_data;
1020 
1021 	bzero((caddr_t)ub, sizeof(*ub));
1022 	ub->st_mode = S_IFIFO;
1023 	ub->st_blksize = pipe->pipe_buffer.size;
1024 	ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1025 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1026 	ub->st_atimespec = pipe->pipe_atime;
1027 	ub->st_mtimespec = pipe->pipe_mtime;
1028 	ub->st_ctimespec = pipe->pipe_ctime;
1029 	/*
1030 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1031 	 * st_flags, st_gen.
1032 	 * XXX (st_dev, st_ino) should be unique.
1033 	 */
1034 	return (0);
1035 }
1036 
1037 static int
1038 pipe_close(struct file *fp)
1039 {
1040 	struct pipe *cpipe;
1041 
1042 	cpipe = (struct pipe *)fp->f_data;
1043 	fp->f_ops = &badfileops;
1044 	fp->f_data = NULL;
1045 	funsetown(&cpipe->pipe_sigio);
1046 	pipeclose(cpipe);
1047 	return (0);
1048 }
1049 
1050 /*
1051  * Shutdown one or both directions of a full-duplex pipe.
1052  */
1053 static int
1054 pipe_shutdown(struct file *fp, int how)
1055 {
1056 	struct pipe *rpipe;
1057 	struct pipe *wpipe;
1058 	int error = EPIPE;
1059 
1060 	rpipe = (struct pipe *)fp->f_data;
1061 	wpipe = rpipe->pipe_peer;
1062 
1063 	/*
1064 	 * We modify pipe_state on both pipes, which means we need
1065 	 * all four tokens!
1066 	 */
1067 	lwkt_gettoken(&rpipe->pipe_rlock);
1068 	lwkt_gettoken(&rpipe->pipe_wlock);
1069 	lwkt_gettoken(&wpipe->pipe_rlock);
1070 	lwkt_gettoken(&wpipe->pipe_wlock);
1071 
1072 	switch(how) {
1073 	case SHUT_RDWR:
1074 	case SHUT_RD:
1075 		rpipe->pipe_state |= PIPE_REOF;		/* my reads */
1076 		rpipe->pipe_state |= PIPE_WEOF;		/* peer writes */
1077 		if (rpipe->pipe_state & PIPE_WANTR) {
1078 			rpipe->pipe_state &= ~PIPE_WANTR;
1079 			wakeup(rpipe);
1080 		}
1081 		if (rpipe->pipe_state & PIPE_WANTW) {
1082 			rpipe->pipe_state &= ~PIPE_WANTW;
1083 			wakeup(rpipe);
1084 		}
1085 		error = 0;
1086 		if (how == SHUT_RD)
1087 			break;
1088 		/* fall through */
1089 	case SHUT_WR:
1090 		wpipe->pipe_state |= PIPE_REOF;		/* peer reads */
1091 		wpipe->pipe_state |= PIPE_WEOF;		/* my writes */
1092 		if (wpipe->pipe_state & PIPE_WANTR) {
1093 			wpipe->pipe_state &= ~PIPE_WANTR;
1094 			wakeup(wpipe);
1095 		}
1096 		if (wpipe->pipe_state & PIPE_WANTW) {
1097 			wpipe->pipe_state &= ~PIPE_WANTW;
1098 			wakeup(wpipe);
1099 		}
1100 		error = 0;
1101 		break;
1102 	}
1103 	pipewakeup(rpipe, 1);
1104 	pipewakeup(wpipe, 1);
1105 
1106 	lwkt_reltoken(&wpipe->pipe_wlock);
1107 	lwkt_reltoken(&wpipe->pipe_rlock);
1108 	lwkt_reltoken(&rpipe->pipe_wlock);
1109 	lwkt_reltoken(&rpipe->pipe_rlock);
1110 
1111 	return (error);
1112 }
1113 
1114 static void
1115 pipe_free_kmem(struct pipe *cpipe)
1116 {
1117 	if (cpipe->pipe_buffer.buffer != NULL) {
1118 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1119 			atomic_subtract_int(&pipe_nbig, 1);
1120 		kmem_free(&kernel_map,
1121 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1122 			cpipe->pipe_buffer.size);
1123 		cpipe->pipe_buffer.buffer = NULL;
1124 		cpipe->pipe_buffer.object = NULL;
1125 	}
1126 }
1127 
1128 /*
1129  * Close the pipe.  The slock must be held to interlock against simultanious
1130  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1131  */
1132 static void
1133 pipeclose(struct pipe *cpipe)
1134 {
1135 	globaldata_t gd;
1136 	struct pipe *ppipe;
1137 
1138 	if (cpipe == NULL)
1139 		return;
1140 
1141 	/*
1142 	 * The slock may not have been allocated yet (close during
1143 	 * initialization)
1144 	 *
1145 	 * We need both the read and write tokens to modify pipe_state.
1146 	 */
1147 	if (cpipe->pipe_slock)
1148 		lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1149 	lwkt_gettoken(&cpipe->pipe_rlock);
1150 	lwkt_gettoken(&cpipe->pipe_wlock);
1151 
1152 	/*
1153 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1154 	 * wakeup anyone blocked on our pipe.
1155 	 */
1156 	cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1157 	pipewakeup(cpipe, 1);
1158 	if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1159 		cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1160 		wakeup(cpipe);
1161 	}
1162 
1163 	/*
1164 	 * Disconnect from peer.
1165 	 */
1166 	if ((ppipe = cpipe->pipe_peer) != NULL) {
1167 		lwkt_gettoken(&ppipe->pipe_rlock);
1168 		lwkt_gettoken(&ppipe->pipe_wlock);
1169 		ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1170 		pipewakeup(ppipe, 1);
1171 		if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1172 			ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1173 			wakeup(ppipe);
1174 		}
1175 		if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1176 			KNOTE(&ppipe->pipe_kq.ki_note, 0);
1177 		lwkt_reltoken(&ppipe->pipe_wlock);
1178 		lwkt_reltoken(&ppipe->pipe_rlock);
1179 	}
1180 
1181 	/*
1182 	 * If the peer is also closed we can free resources for both
1183 	 * sides, otherwise we leave our side intact to deal with any
1184 	 * races (since we only have the slock).
1185 	 */
1186 	if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1187 		cpipe->pipe_peer = NULL;
1188 		ppipe->pipe_peer = NULL;
1189 		ppipe->pipe_slock = NULL;	/* we will free the slock */
1190 		pipeclose(ppipe);
1191 		ppipe = NULL;
1192 	}
1193 
1194 	lwkt_reltoken(&cpipe->pipe_wlock);
1195 	lwkt_reltoken(&cpipe->pipe_rlock);
1196 	if (cpipe->pipe_slock)
1197 		lockmgr(cpipe->pipe_slock, LK_RELEASE);
1198 
1199 	/*
1200 	 * If we disassociated from our peer we can free resources
1201 	 */
1202 	if (ppipe == NULL) {
1203 		gd = mycpu;
1204 		if (cpipe->pipe_slock) {
1205 			kfree(cpipe->pipe_slock, M_PIPE);
1206 			cpipe->pipe_slock = NULL;
1207 		}
1208 		if (gd->gd_pipeqcount >= pipe_maxcache ||
1209 		    cpipe->pipe_buffer.size != PIPE_SIZE
1210 		) {
1211 			pipe_free_kmem(cpipe);
1212 			kfree(cpipe, M_PIPE);
1213 		} else {
1214 			cpipe->pipe_state = 0;
1215 			cpipe->pipe_peer = gd->gd_pipeq;
1216 			gd->gd_pipeq = cpipe;
1217 			++gd->gd_pipeqcount;
1218 		}
1219 	}
1220 }
1221 
1222 static int
1223 pipe_kqfilter(struct file *fp, struct knote *kn)
1224 {
1225 	struct pipe *cpipe;
1226 
1227 	cpipe = (struct pipe *)kn->kn_fp->f_data;
1228 
1229 	switch (kn->kn_filter) {
1230 	case EVFILT_READ:
1231 		kn->kn_fop = &pipe_rfiltops;
1232 		break;
1233 	case EVFILT_WRITE:
1234 		kn->kn_fop = &pipe_wfiltops;
1235 		if (cpipe->pipe_peer == NULL) {
1236 			/* other end of pipe has been closed */
1237 			return (EPIPE);
1238 		}
1239 		break;
1240 	default:
1241 		return (EOPNOTSUPP);
1242 	}
1243 	kn->kn_hook = (caddr_t)cpipe;
1244 
1245 	knote_insert(&cpipe->pipe_kq.ki_note, kn);
1246 
1247 	return (0);
1248 }
1249 
1250 static void
1251 filt_pipedetach(struct knote *kn)
1252 {
1253 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1254 
1255 	knote_remove(&cpipe->pipe_kq.ki_note, kn);
1256 }
1257 
1258 /*ARGSUSED*/
1259 static int
1260 filt_piperead(struct knote *kn, long hint)
1261 {
1262 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1263 	int ready = 0;
1264 
1265 	lwkt_gettoken(&rpipe->pipe_rlock);
1266 	lwkt_gettoken(&rpipe->pipe_wlock);
1267 
1268 	kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1269 
1270 	if (rpipe->pipe_state & PIPE_REOF) {
1271 		/*
1272 		 * Only set NODATA if all data has been exhausted
1273 		 */
1274 		if (kn->kn_data == 0)
1275 			kn->kn_flags |= EV_NODATA;
1276 		kn->kn_flags |= EV_EOF;
1277 		ready = 1;
1278 	}
1279 
1280 	lwkt_reltoken(&rpipe->pipe_wlock);
1281 	lwkt_reltoken(&rpipe->pipe_rlock);
1282 
1283 	if (!ready)
1284 		ready = kn->kn_data > 0;
1285 
1286 	return (ready);
1287 }
1288 
1289 /*ARGSUSED*/
1290 static int
1291 filt_pipewrite(struct knote *kn, long hint)
1292 {
1293 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1294 	struct pipe *wpipe = rpipe->pipe_peer;
1295 	int ready = 0;
1296 
1297 	kn->kn_data = 0;
1298 	if (wpipe == NULL) {
1299 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1300 		return (1);
1301 	}
1302 
1303 	lwkt_gettoken(&wpipe->pipe_rlock);
1304 	lwkt_gettoken(&wpipe->pipe_wlock);
1305 
1306 	if (wpipe->pipe_state & PIPE_WEOF) {
1307 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1308 		ready = 1;
1309 	}
1310 
1311 	if (!ready)
1312 		kn->kn_data = wpipe->pipe_buffer.size -
1313 			      (wpipe->pipe_buffer.windex -
1314 			       wpipe->pipe_buffer.rindex);
1315 
1316 	lwkt_reltoken(&wpipe->pipe_wlock);
1317 	lwkt_reltoken(&wpipe->pipe_rlock);
1318 
1319 	if (!ready)
1320 		ready = kn->kn_data >= PIPE_BUF;
1321 
1322 	return (ready);
1323 }
1324