xref: /dragonfly/sys/kern/sys_pipe.c (revision 92fc8b5c)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22 
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61 
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64 
65 #include <machine/cpufunc.h>
66 
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio,
71 		struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio,
73 		struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79 		struct ucred *cred, struct sysmsg *msg);
80 
81 static struct fileops pipeops = {
82 	.fo_read = pipe_read,
83 	.fo_write = pipe_write,
84 	.fo_ioctl = pipe_ioctl,
85 	.fo_kqfilter = pipe_kqfilter,
86 	.fo_stat = pipe_stat,
87 	.fo_close = pipe_close,
88 	.fo_shutdown = pipe_shutdown
89 };
90 
91 static void	filt_pipedetach(struct knote *kn);
92 static int	filt_piperead(struct knote *kn, long hint);
93 static int	filt_pipewrite(struct knote *kn, long hint);
94 
95 static struct filterops pipe_rfiltops =
96 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99 
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101 
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110 
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES	64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116 
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125 
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;	/* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150 
151 static void pipeclose (struct pipe *cpipe);
152 static void pipe_free_kmem (struct pipe *cpipe);
153 static int pipe_create (struct pipe **cpipep);
154 static int pipespace (struct pipe *cpipe, int size);
155 
156 static __inline void
157 pipewakeup(struct pipe *cpipe, int dosigio)
158 {
159 	if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
160 		lwkt_gettoken(&proc_token);
161 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
162 		lwkt_reltoken(&proc_token);
163 	}
164 	KNOTE(&cpipe->pipe_kq.ki_note, 0);
165 }
166 
167 /*
168  * These routines are called before and after a UIO.  The UIO
169  * may block, causing our held tokens to be lost temporarily.
170  *
171  * We use these routines to serialize reads against other reads
172  * and writes against other writes.
173  *
174  * The read token is held on entry so *ipp does not race.
175  */
176 static __inline int
177 pipe_start_uio(struct pipe *cpipe, int *ipp)
178 {
179 	int error;
180 
181 	while (*ipp) {
182 		*ipp = -1;
183 		error = tsleep(ipp, PCATCH, "pipexx", 0);
184 		if (error)
185 			return (error);
186 	}
187 	*ipp = 1;
188 	return (0);
189 }
190 
191 static __inline void
192 pipe_end_uio(struct pipe *cpipe, int *ipp)
193 {
194 	if (*ipp < 0) {
195 		*ipp = 0;
196 		wakeup(ipp);
197 	} else {
198 		KKASSERT(*ipp > 0);
199 		*ipp = 0;
200 	}
201 }
202 
203 /*
204  * The pipe system call for the DTYPE_PIPE type of pipes
205  *
206  * pipe_args(int dummy)
207  *
208  * MPSAFE
209  */
210 int
211 sys_pipe(struct pipe_args *uap)
212 {
213 	struct thread *td = curthread;
214 	struct filedesc *fdp = td->td_proc->p_fd;
215 	struct file *rf, *wf;
216 	struct pipe *rpipe, *wpipe;
217 	int fd1, fd2, error;
218 
219 	rpipe = wpipe = NULL;
220 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
221 		pipeclose(rpipe);
222 		pipeclose(wpipe);
223 		return (ENFILE);
224 	}
225 
226 	error = falloc(td->td_lwp, &rf, &fd1);
227 	if (error) {
228 		pipeclose(rpipe);
229 		pipeclose(wpipe);
230 		return (error);
231 	}
232 	uap->sysmsg_fds[0] = fd1;
233 
234 	/*
235 	 * Warning: once we've gotten past allocation of the fd for the
236 	 * read-side, we can only drop the read side via fdrop() in order
237 	 * to avoid races against processes which manage to dup() the read
238 	 * side while we are blocked trying to allocate the write side.
239 	 */
240 	rf->f_type = DTYPE_PIPE;
241 	rf->f_flag = FREAD | FWRITE;
242 	rf->f_ops = &pipeops;
243 	rf->f_data = rpipe;
244 	error = falloc(td->td_lwp, &wf, &fd2);
245 	if (error) {
246 		fsetfd(fdp, NULL, fd1);
247 		fdrop(rf);
248 		/* rpipe has been closed by fdrop(). */
249 		pipeclose(wpipe);
250 		return (error);
251 	}
252 	wf->f_type = DTYPE_PIPE;
253 	wf->f_flag = FREAD | FWRITE;
254 	wf->f_ops = &pipeops;
255 	wf->f_data = wpipe;
256 	uap->sysmsg_fds[1] = fd2;
257 
258 	rpipe->pipe_slock = kmalloc(sizeof(struct lock),
259 				    M_PIPE, M_WAITOK|M_ZERO);
260 	wpipe->pipe_slock = rpipe->pipe_slock;
261 	rpipe->pipe_peer = wpipe;
262 	wpipe->pipe_peer = rpipe;
263 	lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
264 
265 	/*
266 	 * Once activated the peer relationship remains valid until
267 	 * both sides are closed.
268 	 */
269 	fsetfd(fdp, rf, fd1);
270 	fsetfd(fdp, wf, fd2);
271 	fdrop(rf);
272 	fdrop(wf);
273 
274 	return (0);
275 }
276 
277 /*
278  * Allocate kva for pipe circular buffer, the space is pageable
279  * This routine will 'realloc' the size of a pipe safely, if it fails
280  * it will retain the old buffer.
281  * If it fails it will return ENOMEM.
282  */
283 static int
284 pipespace(struct pipe *cpipe, int size)
285 {
286 	struct vm_object *object;
287 	caddr_t buffer;
288 	int npages, error;
289 
290 	npages = round_page(size) / PAGE_SIZE;
291 	object = cpipe->pipe_buffer.object;
292 
293 	/*
294 	 * [re]create the object if necessary and reserve space for it
295 	 * in the kernel_map.  The object and memory are pageable.  On
296 	 * success, free the old resources before assigning the new
297 	 * ones.
298 	 */
299 	if (object == NULL || object->size != npages) {
300 		object = vm_object_allocate(OBJT_DEFAULT, npages);
301 		buffer = (caddr_t)vm_map_min(&kernel_map);
302 
303 		error = vm_map_find(&kernel_map, object, 0,
304 				    (vm_offset_t *)&buffer,
305 				    size, PAGE_SIZE,
306 				    1, VM_MAPTYPE_NORMAL,
307 				    VM_PROT_ALL, VM_PROT_ALL,
308 				    0);
309 
310 		if (error != KERN_SUCCESS) {
311 			vm_object_deallocate(object);
312 			return (ENOMEM);
313 		}
314 		pipe_free_kmem(cpipe);
315 		cpipe->pipe_buffer.object = object;
316 		cpipe->pipe_buffer.buffer = buffer;
317 		cpipe->pipe_buffer.size = size;
318 		++pipe_bkmem_alloc;
319 	} else {
320 		++pipe_bcache_alloc;
321 	}
322 	cpipe->pipe_buffer.rindex = 0;
323 	cpipe->pipe_buffer.windex = 0;
324 	return (0);
325 }
326 
327 /*
328  * Initialize and allocate VM and memory for pipe, pulling the pipe from
329  * our per-cpu cache if possible.  For now make sure it is sized for the
330  * smaller PIPE_SIZE default.
331  */
332 static int
333 pipe_create(struct pipe **cpipep)
334 {
335 	globaldata_t gd = mycpu;
336 	struct pipe *cpipe;
337 	int error;
338 
339 	if ((cpipe = gd->gd_pipeq) != NULL) {
340 		gd->gd_pipeq = cpipe->pipe_peer;
341 		--gd->gd_pipeqcount;
342 		cpipe->pipe_peer = NULL;
343 		cpipe->pipe_wantwcnt = 0;
344 	} else {
345 		cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
346 	}
347 	*cpipep = cpipe;
348 	if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
349 		return (error);
350 	vfs_timestamp(&cpipe->pipe_ctime);
351 	cpipe->pipe_atime = cpipe->pipe_ctime;
352 	cpipe->pipe_mtime = cpipe->pipe_ctime;
353 	lwkt_token_init(&cpipe->pipe_rlock, "piper");
354 	lwkt_token_init(&cpipe->pipe_wlock, "pipew");
355 	return (0);
356 }
357 
358 static int
359 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
360 {
361 	struct pipe *rpipe;
362 	struct pipe *wpipe;
363 	int error;
364 	size_t nread = 0;
365 	int nbio;
366 	u_int size;	/* total bytes available */
367 	u_int nsize;	/* total bytes to read */
368 	u_int rindex;	/* contiguous bytes available */
369 	int notify_writer;
370 	int bigread;
371 	int bigcount;
372 
373 	if (uio->uio_resid == 0)
374 		return(0);
375 
376 	/*
377 	 * Setup locks, calculate nbio
378 	 */
379 	rpipe = (struct pipe *)fp->f_data;
380 	wpipe = rpipe->pipe_peer;
381 	lwkt_gettoken(&rpipe->pipe_rlock);
382 
383 	if (fflags & O_FBLOCKING)
384 		nbio = 0;
385 	else if (fflags & O_FNONBLOCKING)
386 		nbio = 1;
387 	else if (fp->f_flag & O_NONBLOCK)
388 		nbio = 1;
389 	else
390 		nbio = 0;
391 
392 	/*
393 	 * Reads are serialized.  Note however that pipe_buffer.buffer and
394 	 * pipe_buffer.size can change out from under us when the number
395 	 * of bytes in the buffer are zero due to the write-side doing a
396 	 * pipespace().
397 	 */
398 	error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
399 	if (error) {
400 		lwkt_reltoken(&rpipe->pipe_rlock);
401 		return (error);
402 	}
403 	notify_writer = 0;
404 
405 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
406 	bigcount = 10;
407 
408 	while (uio->uio_resid) {
409 		/*
410 		 * Don't hog the cpu.
411 		 */
412 		if (bigread && --bigcount == 0) {
413 			lwkt_user_yield();
414 			bigcount = 10;
415 			if (CURSIG(curthread->td_lwp)) {
416 				error = EINTR;
417 				break;
418 			}
419 		}
420 
421 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
422 		cpu_lfence();
423 		if (size) {
424 			rindex = rpipe->pipe_buffer.rindex &
425 				 (rpipe->pipe_buffer.size - 1);
426 			nsize = size;
427 			if (nsize > rpipe->pipe_buffer.size - rindex)
428 				nsize = rpipe->pipe_buffer.size - rindex;
429 			nsize = szmin(nsize, uio->uio_resid);
430 
431 			error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
432 					nsize, uio);
433 			if (error)
434 				break;
435 			cpu_mfence();
436 			rpipe->pipe_buffer.rindex += nsize;
437 			nread += nsize;
438 
439 			/*
440 			 * If the FIFO is still over half full just continue
441 			 * and do not try to notify the writer yet.
442 			 */
443 			if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
444 				notify_writer = 0;
445 				continue;
446 			}
447 
448 			/*
449 			 * When the FIFO is less then half full notify any
450 			 * waiting writer.  WANTW can be checked while
451 			 * holding just the rlock.
452 			 */
453 			notify_writer = 1;
454 			if ((rpipe->pipe_state & PIPE_WANTW) == 0)
455 				continue;
456 		}
457 
458 		/*
459 		 * If the "write-side" was blocked we wake it up.  This code
460 		 * is reached either when the buffer is completely emptied
461 		 * or if it becomes more then half-empty.
462 		 *
463 		 * Pipe_state can only be modified if both the rlock and
464 		 * wlock are held.
465 		 */
466 		if (rpipe->pipe_state & PIPE_WANTW) {
467 			lwkt_gettoken(&rpipe->pipe_wlock);
468 			if (rpipe->pipe_state & PIPE_WANTW) {
469 				rpipe->pipe_state &= ~PIPE_WANTW;
470 				lwkt_reltoken(&rpipe->pipe_wlock);
471 				wakeup(rpipe);
472 			} else {
473 				lwkt_reltoken(&rpipe->pipe_wlock);
474 			}
475 		}
476 
477 		/*
478 		 * Pick up our copy loop again if the writer sent data to
479 		 * us while we were messing around.
480 		 *
481 		 * On a SMP box poll up to pipe_delay nanoseconds for new
482 		 * data.  Typically a value of 2000 to 4000 is sufficient
483 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
484 		 * is used for synchronous communications with small packets,
485 		 * and 8000 or so (8uS) will pipeline large buffer xfers
486 		 * between cpus over a pipe.
487 		 *
488 		 * For synchronous communications a hit means doing a
489 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
490 		 * where as miss requiring a tsleep/wakeup sequence
491 		 * will take 7uS or more.
492 		 */
493 		if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
494 			continue;
495 
496 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
497 		if (pipe_delay) {
498 			int64_t tsc_target;
499 			int good = 0;
500 
501 			tsc_target = tsc_get_target(pipe_delay);
502 			while (tsc_test_target(tsc_target) == 0) {
503 				if (rpipe->pipe_buffer.windex !=
504 				    rpipe->pipe_buffer.rindex) {
505 					good = 1;
506 					break;
507 				}
508 			}
509 			if (good)
510 				continue;
511 		}
512 #endif
513 
514 		/*
515 		 * Detect EOF condition, do not set error.
516 		 */
517 		if (rpipe->pipe_state & PIPE_REOF)
518 			break;
519 
520 		/*
521 		 * Break if some data was read, or if this was a non-blocking
522 		 * read.
523 		 */
524 		if (nread > 0)
525 			break;
526 
527 		if (nbio) {
528 			error = EAGAIN;
529 			break;
530 		}
531 
532 		/*
533 		 * Last chance, interlock with WANTR.
534 		 */
535 		lwkt_gettoken(&rpipe->pipe_wlock);
536 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
537 		if (size) {
538 			lwkt_reltoken(&rpipe->pipe_wlock);
539 			continue;
540 		}
541 
542 		/*
543 		 * Retest EOF - acquiring a new token can temporarily release
544 		 * tokens already held.
545 		 */
546 		if (rpipe->pipe_state & PIPE_REOF) {
547 			lwkt_reltoken(&rpipe->pipe_wlock);
548 			break;
549 		}
550 
551 		/*
552 		 * If there is no more to read in the pipe, reset its
553 		 * pointers to the beginning.  This improves cache hit
554 		 * stats.
555 		 *
556 		 * We need both locks to modify both pointers, and there
557 		 * must also not be a write in progress or the uiomove()
558 		 * in the write might block and temporarily release
559 		 * its wlock, then reacquire and update windex.  We are
560 		 * only serialized against reads, not writes.
561 		 *
562 		 * XXX should we even bother resetting the indices?  It
563 		 *     might actually be more cache efficient not to.
564 		 */
565 		if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
566 		    rpipe->pipe_wip == 0) {
567 			rpipe->pipe_buffer.rindex = 0;
568 			rpipe->pipe_buffer.windex = 0;
569 		}
570 
571 		/*
572 		 * Wait for more data.
573 		 *
574 		 * Pipe_state can only be set if both the rlock and wlock
575 		 * are held.
576 		 */
577 		rpipe->pipe_state |= PIPE_WANTR;
578 		tsleep_interlock(rpipe, PCATCH);
579 		lwkt_reltoken(&rpipe->pipe_wlock);
580 		error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
581 		++pipe_rblocked_count;
582 		if (error)
583 			break;
584 	}
585 	pipe_end_uio(rpipe, &rpipe->pipe_rip);
586 
587 	/*
588 	 * Uptime last access time
589 	 */
590 	if (error == 0 && nread)
591 		vfs_timestamp(&rpipe->pipe_atime);
592 
593 	/*
594 	 * If we drained the FIFO more then half way then handle
595 	 * write blocking hysteresis.
596 	 *
597 	 * Note that PIPE_WANTW cannot be set by the writer without
598 	 * it holding both rlock and wlock, so we can test it
599 	 * while holding just rlock.
600 	 */
601 	if (notify_writer) {
602 		/*
603 		 * Synchronous blocking is done on the pipe involved
604 		 */
605 		if (rpipe->pipe_state & PIPE_WANTW) {
606 			lwkt_gettoken(&rpipe->pipe_wlock);
607 			if (rpipe->pipe_state & PIPE_WANTW) {
608 				rpipe->pipe_state &= ~PIPE_WANTW;
609 				lwkt_reltoken(&rpipe->pipe_wlock);
610 				wakeup(rpipe);
611 			} else {
612 				lwkt_reltoken(&rpipe->pipe_wlock);
613 			}
614 		}
615 
616 		/*
617 		 * But we may also have to deal with a kqueue which is
618 		 * stored on the same pipe as its descriptor, so a
619 		 * EVFILT_WRITE event waiting for our side to drain will
620 		 * be on the other side.
621 		 */
622 		lwkt_gettoken(&wpipe->pipe_wlock);
623 		pipewakeup(wpipe, 0);
624 		lwkt_reltoken(&wpipe->pipe_wlock);
625 	}
626 	/*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
627 	lwkt_reltoken(&rpipe->pipe_rlock);
628 
629 	return (error);
630 }
631 
632 static int
633 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
634 {
635 	int error;
636 	int orig_resid;
637 	int nbio;
638 	struct pipe *wpipe;
639 	struct pipe *rpipe;
640 	u_int windex;
641 	u_int space;
642 	u_int wcount;
643 	int bigwrite;
644 	int bigcount;
645 
646 	/*
647 	 * Writes go to the peer.  The peer will always exist.
648 	 */
649 	rpipe = (struct pipe *) fp->f_data;
650 	wpipe = rpipe->pipe_peer;
651 	lwkt_gettoken(&wpipe->pipe_wlock);
652 	if (wpipe->pipe_state & PIPE_WEOF) {
653 		lwkt_reltoken(&wpipe->pipe_wlock);
654 		return (EPIPE);
655 	}
656 
657 	/*
658 	 * Degenerate case (EPIPE takes prec)
659 	 */
660 	if (uio->uio_resid == 0) {
661 		lwkt_reltoken(&wpipe->pipe_wlock);
662 		return(0);
663 	}
664 
665 	/*
666 	 * Writes are serialized (start_uio must be called with wlock)
667 	 */
668 	error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
669 	if (error) {
670 		lwkt_reltoken(&wpipe->pipe_wlock);
671 		return (error);
672 	}
673 
674 	if (fflags & O_FBLOCKING)
675 		nbio = 0;
676 	else if (fflags & O_FNONBLOCKING)
677 		nbio = 1;
678 	else if (fp->f_flag & O_NONBLOCK)
679 		nbio = 1;
680 	else
681 		nbio = 0;
682 
683 	/*
684 	 * If it is advantageous to resize the pipe buffer, do
685 	 * so.  We are write-serialized so we can block safely.
686 	 */
687 	if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
688 	    (pipe_nbig < pipe_maxbig) &&
689 	    wpipe->pipe_wantwcnt > 4 &&
690 	    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
691 		/*
692 		 * Recheck after lock.
693 		 */
694 		lwkt_gettoken(&wpipe->pipe_rlock);
695 		if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
696 		    (pipe_nbig < pipe_maxbig) &&
697 		    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
698 			atomic_add_int(&pipe_nbig, 1);
699 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
700 				++pipe_bigcount;
701 			else
702 				atomic_subtract_int(&pipe_nbig, 1);
703 		}
704 		lwkt_reltoken(&wpipe->pipe_rlock);
705 	}
706 
707 	orig_resid = uio->uio_resid;
708 	wcount = 0;
709 
710 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
711 	bigcount = 10;
712 
713 	while (uio->uio_resid) {
714 		if (wpipe->pipe_state & PIPE_WEOF) {
715 			error = EPIPE;
716 			break;
717 		}
718 
719 		/*
720 		 * Don't hog the cpu.
721 		 */
722 		if (bigwrite && --bigcount == 0) {
723 			lwkt_user_yield();
724 			bigcount = 10;
725 			if (CURSIG(curthread->td_lwp)) {
726 				error = EINTR;
727 				break;
728 			}
729 		}
730 
731 		windex = wpipe->pipe_buffer.windex &
732 			 (wpipe->pipe_buffer.size - 1);
733 		space = wpipe->pipe_buffer.size -
734 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
735 		cpu_lfence();
736 
737 		/* Writes of size <= PIPE_BUF must be atomic. */
738 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
739 			space = 0;
740 
741 		/*
742 		 * Write to fill, read size handles write hysteresis.  Also
743 		 * additional restrictions can cause select-based non-blocking
744 		 * writes to spin.
745 		 */
746 		if (space > 0) {
747 			u_int segsize;
748 
749 			/*
750 			 * Transfer size is minimum of uio transfer
751 			 * and free space in pipe buffer.
752 			 *
753 			 * Limit each uiocopy to no more then PIPE_SIZE
754 			 * so we can keep the gravy train going on a
755 			 * SMP box.  This doubles the performance for
756 			 * write sizes > 16K.  Otherwise large writes
757 			 * wind up doing an inefficient synchronous
758 			 * ping-pong.
759 			 */
760 			space = szmin(space, uio->uio_resid);
761 			if (space > PIPE_SIZE)
762 				space = PIPE_SIZE;
763 
764 			/*
765 			 * First segment to transfer is minimum of
766 			 * transfer size and contiguous space in
767 			 * pipe buffer.  If first segment to transfer
768 			 * is less than the transfer size, we've got
769 			 * a wraparound in the buffer.
770 			 */
771 			segsize = wpipe->pipe_buffer.size - windex;
772 			if (segsize > space)
773 				segsize = space;
774 
775 #ifdef SMP
776 			/*
777 			 * If this is the first loop and the reader is
778 			 * blocked, do a preemptive wakeup of the reader.
779 			 *
780 			 * On SMP the IPI latency plus the wlock interlock
781 			 * on the reader side is the fastest way to get the
782 			 * reader going.  (The scheduler will hard loop on
783 			 * lock tokens).
784 			 *
785 			 * NOTE: We can't clear WANTR here without acquiring
786 			 * the rlock, which we don't want to do here!
787 			 */
788 			if ((wpipe->pipe_state & PIPE_WANTR))
789 				wakeup(wpipe);
790 #endif
791 
792 			/*
793 			 * Transfer segment, which may include a wrap-around.
794 			 * Update windex to account for both all in one go
795 			 * so the reader can read() the data atomically.
796 			 */
797 			error = uiomove(&wpipe->pipe_buffer.buffer[windex],
798 					segsize, uio);
799 			if (error == 0 && segsize < space) {
800 				segsize = space - segsize;
801 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
802 						segsize, uio);
803 			}
804 			if (error)
805 				break;
806 			cpu_mfence();
807 			wpipe->pipe_buffer.windex += space;
808 			wcount += space;
809 			continue;
810 		}
811 
812 		/*
813 		 * We need both the rlock and the wlock to interlock against
814 		 * the EOF, WANTW, and size checks, and to modify pipe_state.
815 		 *
816 		 * These are token locks so we do not have to worry about
817 		 * deadlocks.
818 		 */
819 		lwkt_gettoken(&wpipe->pipe_rlock);
820 
821 		/*
822 		 * If the "read-side" has been blocked, wake it up now
823 		 * and yield to let it drain synchronously rather
824 		 * then block.
825 		 */
826 		if (wpipe->pipe_state & PIPE_WANTR) {
827 			wpipe->pipe_state &= ~PIPE_WANTR;
828 			wakeup(wpipe);
829 		}
830 
831 		/*
832 		 * don't block on non-blocking I/O
833 		 */
834 		if (nbio) {
835 			lwkt_reltoken(&wpipe->pipe_rlock);
836 			error = EAGAIN;
837 			break;
838 		}
839 
840 		/*
841 		 * re-test whether we have to block in the writer after
842 		 * acquiring both locks, in case the reader opened up
843 		 * some space.
844 		 */
845 		space = wpipe->pipe_buffer.size -
846 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
847 		cpu_lfence();
848 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
849 			space = 0;
850 
851 		/*
852 		 * Retest EOF - acquiring a new token can temporarily release
853 		 * tokens already held.
854 		 */
855 		if (wpipe->pipe_state & PIPE_WEOF) {
856 			lwkt_reltoken(&wpipe->pipe_rlock);
857 			error = EPIPE;
858 			break;
859 		}
860 
861 		/*
862 		 * We have no more space and have something to offer,
863 		 * wake up select/poll/kq.
864 		 */
865 		if (space == 0) {
866 			wpipe->pipe_state |= PIPE_WANTW;
867 			++wpipe->pipe_wantwcnt;
868 			pipewakeup(wpipe, 1);
869 			if (wpipe->pipe_state & PIPE_WANTW)
870 				error = tsleep(wpipe, PCATCH, "pipewr", 0);
871 			++pipe_wblocked_count;
872 		}
873 		lwkt_reltoken(&wpipe->pipe_rlock);
874 
875 		/*
876 		 * Break out if we errored or the read side wants us to go
877 		 * away.
878 		 */
879 		if (error)
880 			break;
881 		if (wpipe->pipe_state & PIPE_WEOF) {
882 			error = EPIPE;
883 			break;
884 		}
885 	}
886 	pipe_end_uio(wpipe, &wpipe->pipe_wip);
887 
888 	/*
889 	 * If we have put any characters in the buffer, we wake up
890 	 * the reader.
891 	 *
892 	 * Both rlock and wlock are required to be able to modify pipe_state.
893 	 */
894 	if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
895 		if (wpipe->pipe_state & PIPE_WANTR) {
896 			lwkt_gettoken(&wpipe->pipe_rlock);
897 			if (wpipe->pipe_state & PIPE_WANTR) {
898 				wpipe->pipe_state &= ~PIPE_WANTR;
899 				lwkt_reltoken(&wpipe->pipe_rlock);
900 				wakeup(wpipe);
901 			} else {
902 				lwkt_reltoken(&wpipe->pipe_rlock);
903 			}
904 		}
905 		lwkt_gettoken(&wpipe->pipe_rlock);
906 		pipewakeup(wpipe, 1);
907 		lwkt_reltoken(&wpipe->pipe_rlock);
908 	}
909 
910 	/*
911 	 * Don't return EPIPE if I/O was successful
912 	 */
913 	if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
914 	    (uio->uio_resid == 0) &&
915 	    (error == EPIPE)) {
916 		error = 0;
917 	}
918 
919 	if (error == 0)
920 		vfs_timestamp(&wpipe->pipe_mtime);
921 
922 	/*
923 	 * We have something to offer,
924 	 * wake up select/poll/kq.
925 	 */
926 	/*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
927 	lwkt_reltoken(&wpipe->pipe_wlock);
928 	return (error);
929 }
930 
931 /*
932  * we implement a very minimal set of ioctls for compatibility with sockets.
933  */
934 int
935 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
936 	   struct ucred *cred, struct sysmsg *msg)
937 {
938 	struct pipe *mpipe;
939 	int error;
940 
941 	mpipe = (struct pipe *)fp->f_data;
942 
943 	lwkt_gettoken(&mpipe->pipe_rlock);
944 	lwkt_gettoken(&mpipe->pipe_wlock);
945 
946 	switch (cmd) {
947 	case FIOASYNC:
948 		if (*(int *)data) {
949 			mpipe->pipe_state |= PIPE_ASYNC;
950 		} else {
951 			mpipe->pipe_state &= ~PIPE_ASYNC;
952 		}
953 		error = 0;
954 		break;
955 	case FIONREAD:
956 		*(int *)data = mpipe->pipe_buffer.windex -
957 				mpipe->pipe_buffer.rindex;
958 		error = 0;
959 		break;
960 	case FIOSETOWN:
961 		error = fsetown(*(int *)data, &mpipe->pipe_sigio);
962 		break;
963 	case FIOGETOWN:
964 		*(int *)data = fgetown(&mpipe->pipe_sigio);
965 		error = 0;
966 		break;
967 	case TIOCSPGRP:
968 		/* This is deprecated, FIOSETOWN should be used instead. */
969 		error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
970 		break;
971 
972 	case TIOCGPGRP:
973 		/* This is deprecated, FIOGETOWN should be used instead. */
974 		*(int *)data = -fgetown(&mpipe->pipe_sigio);
975 		error = 0;
976 		break;
977 	default:
978 		error = ENOTTY;
979 		break;
980 	}
981 	lwkt_reltoken(&mpipe->pipe_wlock);
982 	lwkt_reltoken(&mpipe->pipe_rlock);
983 
984 	return (error);
985 }
986 
987 /*
988  * MPSAFE
989  */
990 static int
991 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
992 {
993 	struct pipe *pipe;
994 
995 	pipe = (struct pipe *)fp->f_data;
996 
997 	bzero((caddr_t)ub, sizeof(*ub));
998 	ub->st_mode = S_IFIFO;
999 	ub->st_blksize = pipe->pipe_buffer.size;
1000 	ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1001 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1002 	ub->st_atimespec = pipe->pipe_atime;
1003 	ub->st_mtimespec = pipe->pipe_mtime;
1004 	ub->st_ctimespec = pipe->pipe_ctime;
1005 	/*
1006 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1007 	 * st_flags, st_gen.
1008 	 * XXX (st_dev, st_ino) should be unique.
1009 	 */
1010 	return (0);
1011 }
1012 
1013 static int
1014 pipe_close(struct file *fp)
1015 {
1016 	struct pipe *cpipe;
1017 
1018 	cpipe = (struct pipe *)fp->f_data;
1019 	fp->f_ops = &badfileops;
1020 	fp->f_data = NULL;
1021 	funsetown(&cpipe->pipe_sigio);
1022 	pipeclose(cpipe);
1023 	return (0);
1024 }
1025 
1026 /*
1027  * Shutdown one or both directions of a full-duplex pipe.
1028  */
1029 static int
1030 pipe_shutdown(struct file *fp, int how)
1031 {
1032 	struct pipe *rpipe;
1033 	struct pipe *wpipe;
1034 	int error = EPIPE;
1035 
1036 	rpipe = (struct pipe *)fp->f_data;
1037 	wpipe = rpipe->pipe_peer;
1038 
1039 	/*
1040 	 * We modify pipe_state on both pipes, which means we need
1041 	 * all four tokens!
1042 	 */
1043 	lwkt_gettoken(&rpipe->pipe_rlock);
1044 	lwkt_gettoken(&rpipe->pipe_wlock);
1045 	lwkt_gettoken(&wpipe->pipe_rlock);
1046 	lwkt_gettoken(&wpipe->pipe_wlock);
1047 
1048 	switch(how) {
1049 	case SHUT_RDWR:
1050 	case SHUT_RD:
1051 		rpipe->pipe_state |= PIPE_REOF;		/* my reads */
1052 		rpipe->pipe_state |= PIPE_WEOF;		/* peer writes */
1053 		if (rpipe->pipe_state & PIPE_WANTR) {
1054 			rpipe->pipe_state &= ~PIPE_WANTR;
1055 			wakeup(rpipe);
1056 		}
1057 		if (rpipe->pipe_state & PIPE_WANTW) {
1058 			rpipe->pipe_state &= ~PIPE_WANTW;
1059 			wakeup(rpipe);
1060 		}
1061 		error = 0;
1062 		if (how == SHUT_RD)
1063 			break;
1064 		/* fall through */
1065 	case SHUT_WR:
1066 		wpipe->pipe_state |= PIPE_REOF;		/* peer reads */
1067 		wpipe->pipe_state |= PIPE_WEOF;		/* my writes */
1068 		if (wpipe->pipe_state & PIPE_WANTR) {
1069 			wpipe->pipe_state &= ~PIPE_WANTR;
1070 			wakeup(wpipe);
1071 		}
1072 		if (wpipe->pipe_state & PIPE_WANTW) {
1073 			wpipe->pipe_state &= ~PIPE_WANTW;
1074 			wakeup(wpipe);
1075 		}
1076 		error = 0;
1077 		break;
1078 	}
1079 	pipewakeup(rpipe, 1);
1080 	pipewakeup(wpipe, 1);
1081 
1082 	lwkt_reltoken(&wpipe->pipe_wlock);
1083 	lwkt_reltoken(&wpipe->pipe_rlock);
1084 	lwkt_reltoken(&rpipe->pipe_wlock);
1085 	lwkt_reltoken(&rpipe->pipe_rlock);
1086 
1087 	return (error);
1088 }
1089 
1090 static void
1091 pipe_free_kmem(struct pipe *cpipe)
1092 {
1093 	if (cpipe->pipe_buffer.buffer != NULL) {
1094 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1095 			atomic_subtract_int(&pipe_nbig, 1);
1096 		kmem_free(&kernel_map,
1097 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1098 			cpipe->pipe_buffer.size);
1099 		cpipe->pipe_buffer.buffer = NULL;
1100 		cpipe->pipe_buffer.object = NULL;
1101 	}
1102 }
1103 
1104 /*
1105  * Close the pipe.  The slock must be held to interlock against simultanious
1106  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1107  */
1108 static void
1109 pipeclose(struct pipe *cpipe)
1110 {
1111 	globaldata_t gd;
1112 	struct pipe *ppipe;
1113 
1114 	if (cpipe == NULL)
1115 		return;
1116 
1117 	/*
1118 	 * The slock may not have been allocated yet (close during
1119 	 * initialization)
1120 	 *
1121 	 * We need both the read and write tokens to modify pipe_state.
1122 	 */
1123 	if (cpipe->pipe_slock)
1124 		lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1125 	lwkt_gettoken(&cpipe->pipe_rlock);
1126 	lwkt_gettoken(&cpipe->pipe_wlock);
1127 
1128 	/*
1129 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1130 	 * wakeup anyone blocked on our pipe.
1131 	 */
1132 	cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1133 	pipewakeup(cpipe, 1);
1134 	if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1135 		cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1136 		wakeup(cpipe);
1137 	}
1138 
1139 	/*
1140 	 * Disconnect from peer.
1141 	 */
1142 	if ((ppipe = cpipe->pipe_peer) != NULL) {
1143 		lwkt_gettoken(&ppipe->pipe_rlock);
1144 		lwkt_gettoken(&ppipe->pipe_wlock);
1145 		ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1146 		pipewakeup(ppipe, 1);
1147 		if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1148 			ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1149 			wakeup(ppipe);
1150 		}
1151 		if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1152 			KNOTE(&ppipe->pipe_kq.ki_note, 0);
1153 		lwkt_reltoken(&ppipe->pipe_wlock);
1154 		lwkt_reltoken(&ppipe->pipe_rlock);
1155 	}
1156 
1157 	/*
1158 	 * If the peer is also closed we can free resources for both
1159 	 * sides, otherwise we leave our side intact to deal with any
1160 	 * races (since we only have the slock).
1161 	 */
1162 	if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1163 		cpipe->pipe_peer = NULL;
1164 		ppipe->pipe_peer = NULL;
1165 		ppipe->pipe_slock = NULL;	/* we will free the slock */
1166 		pipeclose(ppipe);
1167 		ppipe = NULL;
1168 	}
1169 
1170 	lwkt_reltoken(&cpipe->pipe_wlock);
1171 	lwkt_reltoken(&cpipe->pipe_rlock);
1172 	if (cpipe->pipe_slock)
1173 		lockmgr(cpipe->pipe_slock, LK_RELEASE);
1174 
1175 	/*
1176 	 * If we disassociated from our peer we can free resources
1177 	 */
1178 	if (ppipe == NULL) {
1179 		gd = mycpu;
1180 		if (cpipe->pipe_slock) {
1181 			kfree(cpipe->pipe_slock, M_PIPE);
1182 			cpipe->pipe_slock = NULL;
1183 		}
1184 		if (gd->gd_pipeqcount >= pipe_maxcache ||
1185 		    cpipe->pipe_buffer.size != PIPE_SIZE
1186 		) {
1187 			pipe_free_kmem(cpipe);
1188 			kfree(cpipe, M_PIPE);
1189 		} else {
1190 			cpipe->pipe_state = 0;
1191 			cpipe->pipe_peer = gd->gd_pipeq;
1192 			gd->gd_pipeq = cpipe;
1193 			++gd->gd_pipeqcount;
1194 		}
1195 	}
1196 }
1197 
1198 static int
1199 pipe_kqfilter(struct file *fp, struct knote *kn)
1200 {
1201 	struct pipe *cpipe;
1202 
1203 	cpipe = (struct pipe *)kn->kn_fp->f_data;
1204 
1205 	switch (kn->kn_filter) {
1206 	case EVFILT_READ:
1207 		kn->kn_fop = &pipe_rfiltops;
1208 		break;
1209 	case EVFILT_WRITE:
1210 		kn->kn_fop = &pipe_wfiltops;
1211 		if (cpipe->pipe_peer == NULL) {
1212 			/* other end of pipe has been closed */
1213 			return (EPIPE);
1214 		}
1215 		break;
1216 	default:
1217 		return (EOPNOTSUPP);
1218 	}
1219 	kn->kn_hook = (caddr_t)cpipe;
1220 
1221 	knote_insert(&cpipe->pipe_kq.ki_note, kn);
1222 
1223 	return (0);
1224 }
1225 
1226 static void
1227 filt_pipedetach(struct knote *kn)
1228 {
1229 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1230 
1231 	knote_remove(&cpipe->pipe_kq.ki_note, kn);
1232 }
1233 
1234 /*ARGSUSED*/
1235 static int
1236 filt_piperead(struct knote *kn, long hint)
1237 {
1238 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1239 	int ready = 0;
1240 
1241 	lwkt_gettoken(&rpipe->pipe_rlock);
1242 	lwkt_gettoken(&rpipe->pipe_wlock);
1243 
1244 	kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1245 
1246 	/*
1247 	 * Only set EOF if all data has been exhausted
1248 	 */
1249 	if ((rpipe->pipe_state & PIPE_REOF) && kn->kn_data == 0) {
1250 		kn->kn_flags |= EV_EOF;
1251 		ready = 1;
1252 	}
1253 
1254 	lwkt_reltoken(&rpipe->pipe_wlock);
1255 	lwkt_reltoken(&rpipe->pipe_rlock);
1256 
1257 	if (!ready)
1258 		ready = kn->kn_data > 0;
1259 
1260 	return (ready);
1261 }
1262 
1263 /*ARGSUSED*/
1264 static int
1265 filt_pipewrite(struct knote *kn, long hint)
1266 {
1267 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1268 	struct pipe *wpipe = rpipe->pipe_peer;
1269 	int ready = 0;
1270 
1271 	kn->kn_data = 0;
1272 	if (wpipe == NULL) {
1273 		kn->kn_flags |= EV_EOF;
1274 		return (1);
1275 	}
1276 
1277 	lwkt_gettoken(&wpipe->pipe_rlock);
1278 	lwkt_gettoken(&wpipe->pipe_wlock);
1279 
1280 	if (wpipe->pipe_state & PIPE_WEOF) {
1281 		kn->kn_flags |= EV_EOF;
1282 		ready = 1;
1283 	}
1284 
1285 	if (!ready)
1286 		kn->kn_data = wpipe->pipe_buffer.size -
1287 			      (wpipe->pipe_buffer.windex -
1288 			       wpipe->pipe_buffer.rindex);
1289 
1290 	lwkt_reltoken(&wpipe->pipe_wlock);
1291 	lwkt_reltoken(&wpipe->pipe_rlock);
1292 
1293 	if (!ready)
1294 		ready = kn->kn_data >= PIPE_BUF;
1295 
1296 	return (ready);
1297 }
1298