xref: /dragonfly/sys/kern/sys_pipe.c (revision c9c5aa9e)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysmsg.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68 
69 #include <machine/cpufunc.h>
70 
71 struct pipegdlock {
72 	struct mtx	mtx;
73 } __cachealign;
74 
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 		struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 		struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 		struct ucred *cred, struct sysmsg *msg);
88 
89 __read_mostly static struct fileops pipeops = {
90 	.fo_read = pipe_read,
91 	.fo_write = pipe_write,
92 	.fo_ioctl = pipe_ioctl,
93 	.fo_kqfilter = pipe_kqfilter,
94 	.fo_stat = pipe_stat,
95 	.fo_close = pipe_close,
96 	.fo_shutdown = pipe_shutdown
97 };
98 
99 static void	filt_pipedetach(struct knote *kn);
100 static int	filt_piperead(struct knote *kn, long hint);
101 static int	filt_pipewrite(struct knote *kn, long hint);
102 
103 __read_mostly static struct filterops pipe_rfiltops =
104 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 __read_mostly static struct filterops pipe_wfiltops =
106 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107 
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109 
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111 
112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 __read_mostly static struct pipegdlock *pipe_gdlocks;
114 
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118 
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 __read_mostly static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127 
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 __read_mostly static int pipe_delay = 4000;	/* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143 
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151 	size_t mbytes = kmem_lim_size();
152 	int n;
153 
154 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 		if (mbytes >= 7 * 1024)
156 			pipe_maxcache *= 2;
157 		if (mbytes >= 15 * 1024)
158 			pipe_maxcache *= 2;
159 	}
160 
161 	/*
162 	 * Detune the pcpu caching a bit on systems with an insane number
163 	 * of cpu threads to reduce memory waste.
164 	 */
165 	if (ncpus > 64) {
166 		pipe_maxcache = pipe_maxcache * 64 / ncpus;
167 		if (pipe_maxcache < PIPEQ_MAX_CACHE)
168 			pipe_maxcache = PIPEQ_MAX_CACHE;
169 	}
170 
171 	pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
172 			     M_PIPE, M_WAITOK | M_ZERO);
173 	for (n = 0; n < ncpus; ++n)
174 		mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
175 }
176 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
177 
178 static void pipeclose (struct pipe *pipe,
179 		struct pipebuf *pbr, struct pipebuf *pbw);
180 static void pipe_free_kmem (struct pipebuf *buf);
181 static int pipe_create (struct pipe **pipep);
182 
183 /*
184  * Test and clear the specified flag, wakeup(pb) if it was set.
185  * This function must also act as a memory barrier.
186  */
187 static __inline void
188 pipesignal(struct pipebuf *pb, uint32_t flags)
189 {
190 	uint32_t oflags;
191 	uint32_t nflags;
192 
193 	for (;;) {
194 		oflags = pb->state;
195 		cpu_ccfence();
196 		nflags = oflags & ~flags;
197 		if (atomic_cmpset_int(&pb->state, oflags, nflags))
198 			break;
199 	}
200 	if (oflags & flags)
201 		wakeup(pb);
202 }
203 
204 /*
205  *
206  */
207 static __inline void
208 pipewakeup(struct pipebuf *pb, int dosigio)
209 {
210 	if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
211 		lwkt_gettoken(&sigio_token);
212 		pgsigio(pb->sigio, SIGIO, 0);
213 		lwkt_reltoken(&sigio_token);
214 	}
215 	KNOTE(&pb->kq.ki_note, 0);
216 }
217 
218 /*
219  * These routines are called before and after a UIO.  The UIO
220  * may block, causing our held tokens to be lost temporarily.
221  *
222  * We use these routines to serialize reads against other reads
223  * and writes against other writes.
224  *
225  * The appropriate token is held on entry so *ipp does not race.
226  */
227 static __inline int
228 pipe_start_uio(int *ipp)
229 {
230 	int error;
231 
232 	while (*ipp) {
233 		*ipp = -1;
234 		error = tsleep(ipp, PCATCH, "pipexx", 0);
235 		if (error)
236 			return (error);
237 	}
238 	*ipp = 1;
239 	return (0);
240 }
241 
242 static __inline void
243 pipe_end_uio(int *ipp)
244 {
245 	if (*ipp < 0) {
246 		*ipp = 0;
247 		wakeup(ipp);
248 	} else {
249 		KKASSERT(*ipp > 0);
250 		*ipp = 0;
251 	}
252 }
253 
254 /*
255  * The pipe system call for the DTYPE_PIPE type of pipes
256  *
257  * pipe_args(int dummy)
258  *
259  * MPSAFE
260  */
261 int
262 sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
263 {
264 	return kern_pipe(sysmsg->sysmsg_fds, 0);
265 }
266 
267 int
268 sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
269 {
270 	return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
271 }
272 
273 int
274 kern_pipe(long *fds, int flags)
275 {
276 	struct thread *td = curthread;
277 	struct filedesc *fdp = td->td_proc->p_fd;
278 	struct file *rf, *wf;
279 	struct pipe *pipe;
280 	int fd1, fd2, error;
281 
282 	pipe = NULL;
283 	if (pipe_create(&pipe)) {
284 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
285 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
286 		return (ENFILE);
287 	}
288 
289 	error = falloc(td->td_lwp, &rf, &fd1);
290 	if (error) {
291 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
292 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
293 		return (error);
294 	}
295 	fds[0] = fd1;
296 
297 	/*
298 	 * Warning: once we've gotten past allocation of the fd for the
299 	 * read-side, we can only drop the read side via fdrop() in order
300 	 * to avoid races against processes which manage to dup() the read
301 	 * side while we are blocked trying to allocate the write side.
302 	 */
303 	rf->f_type = DTYPE_PIPE;
304 	rf->f_flag = FREAD | FWRITE;
305 	rf->f_ops = &pipeops;
306 	rf->f_data = (void *)((intptr_t)pipe | 0);
307 	if (flags & O_NONBLOCK)
308 		rf->f_flag |= O_NONBLOCK;
309 	if (flags & O_CLOEXEC)
310 		fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
311 
312 	error = falloc(td->td_lwp, &wf, &fd2);
313 	if (error) {
314 		fsetfd(fdp, NULL, fd1);
315 		fdrop(rf);
316 		/* pipeA has been closed by fdrop() */
317 		/* close pipeB here */
318 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
319 		return (error);
320 	}
321 	wf->f_type = DTYPE_PIPE;
322 	wf->f_flag = FREAD | FWRITE;
323 	wf->f_ops = &pipeops;
324 	wf->f_data = (void *)((intptr_t)pipe | 1);
325 	if (flags & O_NONBLOCK)
326 		wf->f_flag |= O_NONBLOCK;
327 	if (flags & O_CLOEXEC)
328 		fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
329 
330 	fds[1] = fd2;
331 
332 	/*
333 	 * Once activated the peer relationship remains valid until
334 	 * both sides are closed.
335 	 */
336 	fsetfd(fdp, rf, fd1);
337 	fsetfd(fdp, wf, fd2);
338 	fdrop(rf);
339 	fdrop(wf);
340 
341 	return (0);
342 }
343 
344 /*
345  * [re]allocates KVA for the pipe's circular buffer.  The space is
346  * pageable.  Called twice to setup full-duplex communications.
347  *
348  * NOTE: Independent vm_object's are used to improve performance.
349  *
350  * Returns 0 on success, ENOMEM on failure.
351  */
352 static int
353 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
354 {
355 	struct vm_object *object;
356 	caddr_t buffer;
357 	vm_pindex_t npages;
358 	int error;
359 
360 	size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
361 	if (size < 16384)
362 		size = 16384;
363 	if (size > 1024*1024)
364 		size = 1024*1024;
365 
366 	npages = round_page(size) / PAGE_SIZE;
367 	object = pb->object;
368 
369 	/*
370 	 * [re]create the object if necessary and reserve space for it
371 	 * in the kernel_map.  The object and memory are pageable.  On
372 	 * success, free the old resources before assigning the new
373 	 * ones.
374 	 */
375 	if (object == NULL || object->size != npages) {
376 		object = vm_object_allocate(OBJT_DEFAULT, npages);
377 		buffer = (caddr_t)vm_map_min(&kernel_map);
378 
379 		error = vm_map_find(&kernel_map, object, NULL,
380 				    0, (vm_offset_t *)&buffer, size,
381 				    PAGE_SIZE, TRUE,
382 				    VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
383 				    VM_PROT_ALL, VM_PROT_ALL, 0);
384 
385 		if (error != KERN_SUCCESS) {
386 			vm_object_deallocate(object);
387 			return (ENOMEM);
388 		}
389 		pipe_free_kmem(pb);
390 		pb->object = object;
391 		pb->buffer = buffer;
392 		pb->size = size;
393 	}
394 	pb->rindex = 0;
395 	pb->windex = 0;
396 
397 	return (0);
398 }
399 
400 /*
401  * Initialize and allocate VM and memory for pipe, pulling the pipe from
402  * our per-cpu cache if possible.
403  *
404  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
405  * must still deallocate the pipe on failure.
406  */
407 static int
408 pipe_create(struct pipe **pipep)
409 {
410 	globaldata_t gd = mycpu;
411 	struct pipe *pipe;
412 	int error;
413 
414 	if ((pipe = gd->gd_pipeq) != NULL) {
415 		gd->gd_pipeq = pipe->next;
416 		--gd->gd_pipeqcount;
417 		pipe->next = NULL;
418 	} else {
419 		pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
420 		pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
421 		lwkt_token_init(&pipe->bufferA.rlock, "piper");
422 		lwkt_token_init(&pipe->bufferA.wlock, "pipew");
423 		lwkt_token_init(&pipe->bufferB.rlock, "piper");
424 		lwkt_token_init(&pipe->bufferB.wlock, "pipew");
425 	}
426 	*pipep = pipe;
427 	if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
428 		return (error);
429 	}
430 	if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
431 		return (error);
432 	}
433 	vfs_timestamp(&pipe->ctime);
434 	pipe->bufferA.atime = pipe->ctime;
435 	pipe->bufferA.mtime = pipe->ctime;
436 	pipe->bufferB.atime = pipe->ctime;
437 	pipe->bufferB.mtime = pipe->ctime;
438 	pipe->open_count = 2;
439 
440 	return (0);
441 }
442 
443 /*
444  * Read data from a pipe
445  */
446 static int
447 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
448 {
449 	struct pipebuf *rpb;
450 	struct pipebuf *wpb;
451 	struct pipe *pipe;
452 	size_t nread = 0;
453 	size_t size;	/* total bytes available */
454 	size_t nsize;	/* total bytes to read */
455 	size_t rindex;	/* contiguous bytes available */
456 	int notify_writer;
457 	int bigread;
458 	int bigcount;
459 	int error;
460 	int nbio;
461 
462 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
463 	if ((intptr_t)fp->f_data & 1) {
464 		rpb = &pipe->bufferB;
465 		wpb = &pipe->bufferA;
466 	} else {
467 		rpb = &pipe->bufferA;
468 		wpb = &pipe->bufferB;
469 	}
470 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
471 
472 	if (uio->uio_resid == 0)
473 		return(0);
474 
475 	/*
476 	 * Calculate nbio
477 	 */
478 	if (fflags & O_FBLOCKING)
479 		nbio = 0;
480 	else if (fflags & O_FNONBLOCKING)
481 		nbio = 1;
482 	else if (fp->f_flag & O_NONBLOCK)
483 		nbio = 1;
484 	else
485 		nbio = 0;
486 
487 	/*
488 	 * 'quick' NBIO test before things get expensive.
489 	 */
490 	if (nbio && rpb->rindex == rpb->windex &&
491 	    (rpb->state & PIPE_REOF) == 0) {
492 		return EAGAIN;
493 	}
494 
495 	/*
496 	 * Reads are serialized.  Note however that buffer.buffer and
497 	 * buffer.size can change out from under us when the number
498 	 * of bytes in the buffer are zero due to the write-side doing a
499 	 * pipespace().
500 	 */
501 	lwkt_gettoken(&rpb->rlock);
502 	error = pipe_start_uio(&rpb->rip);
503 	if (error) {
504 		lwkt_reltoken(&rpb->rlock);
505 		return (error);
506 	}
507 	notify_writer = 0;
508 
509 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
510 	bigcount = 10;
511 
512 	while (uio->uio_resid) {
513 		/*
514 		 * Don't hog the cpu.
515 		 */
516 		if (bigread && --bigcount == 0) {
517 			lwkt_user_yield();
518 			bigcount = 10;
519 			if (CURSIG(curthread->td_lwp)) {
520 				error = EINTR;
521 				break;
522 			}
523 		}
524 
525 		/*
526 		 * lfence required to avoid read-reordering of buffer
527 		 * contents prior to validation of size.
528 		 */
529 		size = rpb->windex - rpb->rindex;
530 		cpu_lfence();
531 		if (size) {
532 			rindex = rpb->rindex & (rpb->size - 1);
533 			nsize = size;
534 			if (nsize > rpb->size - rindex)
535 				nsize = rpb->size - rindex;
536 			nsize = szmin(nsize, uio->uio_resid);
537 
538 			/*
539 			 * Limit how much we move in one go so we have a
540 			 * chance to kick the writer while data is still
541 			 * available in the pipe.  This avoids getting into
542 			 * a ping-pong with the writer.
543 			 */
544 			if (nsize > (rpb->size >> 1))
545 				nsize = rpb->size >> 1;
546 
547 			error = uiomove(&rpb->buffer[rindex], nsize, uio);
548 			if (error)
549 				break;
550 			rpb->rindex += nsize;
551 			nread += nsize;
552 
553 			/*
554 			 * If the FIFO is still over half full just continue
555 			 * and do not try to notify the writer yet.  If
556 			 * less than half full notify any waiting writer.
557 			 */
558 			if (size - nsize > (rpb->size >> 1)) {
559 				notify_writer = 0;
560 			} else {
561 				notify_writer = 1;
562 				pipesignal(rpb, PIPE_WANTW);
563 			}
564 			continue;
565 		}
566 
567 		/*
568 		 * If the "write-side" was blocked we wake it up.  This code
569 		 * is reached when the buffer is completely emptied.
570 		 */
571 		pipesignal(rpb, PIPE_WANTW);
572 
573 		/*
574 		 * Pick up our copy loop again if the writer sent data to
575 		 * us while we were messing around.
576 		 *
577 		 * On a SMP box poll up to pipe_delay nanoseconds for new
578 		 * data.  Typically a value of 2000 to 4000 is sufficient
579 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
580 		 * is used for synchronous communications with small packets,
581 		 * and 8000 or so (8uS) will pipeline large buffer xfers
582 		 * between cpus over a pipe.
583 		 *
584 		 * For synchronous communications a hit means doing a
585 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586 		 * where as miss requiring a tsleep/wakeup sequence
587 		 * will take 7uS or more.
588 		 */
589 		if (rpb->windex != rpb->rindex)
590 			continue;
591 
592 #ifdef _RDTSC_SUPPORTED_
593 		if (pipe_delay) {
594 			int64_t tsc_target;
595 			int good = 0;
596 
597 			tsc_target = tsc_get_target(pipe_delay);
598 			while (tsc_test_target(tsc_target) == 0) {
599 				cpu_lfence();
600 				if (rpb->windex != rpb->rindex) {
601 					good = 1;
602 					break;
603 				}
604 				cpu_pause();
605 			}
606 			if (good)
607 				continue;
608 		}
609 #endif
610 
611 		/*
612 		 * Detect EOF condition, do not set error.
613 		 */
614 		if (rpb->state & PIPE_REOF)
615 			break;
616 
617 		/*
618 		 * Break if some data was read, or if this was a non-blocking
619 		 * read.
620 		 */
621 		if (nread > 0)
622 			break;
623 
624 		if (nbio) {
625 			error = EAGAIN;
626 			break;
627 		}
628 
629 		/*
630 		 * Last chance, interlock with WANTR
631 		 */
632 		tsleep_interlock(rpb, PCATCH);
633 		atomic_set_int(&rpb->state, PIPE_WANTR);
634 
635 		/*
636 		 * Retest bytes available after memory barrier above.
637 		 */
638 		size = rpb->windex - rpb->rindex;
639 		if (size)
640 			continue;
641 
642 		/*
643 		 * Retest EOF after memory barrier above.
644 		 */
645 		if (rpb->state & PIPE_REOF)
646 			break;
647 
648 		/*
649 		 * Wait for more data or state change
650 		 */
651 		error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
652 		if (error)
653 			break;
654 	}
655 	pipe_end_uio(&rpb->rip);
656 
657 	/*
658 	 * Uptime last access time
659 	 */
660 	if (error == 0 && nread && rpb->lticks != ticks) {
661 		vfs_timestamp(&rpb->atime);
662 		rpb->lticks = ticks;
663 	}
664 
665 	/*
666 	 * If we drained the FIFO more then half way then handle
667 	 * write blocking hysteresis.
668 	 *
669 	 * Note that PIPE_WANTW cannot be set by the writer without
670 	 * it holding both rlock and wlock, so we can test it
671 	 * while holding just rlock.
672 	 */
673 	if (notify_writer) {
674 		/*
675 		 * Synchronous blocking is done on the pipe involved
676 		 */
677 		pipesignal(rpb, PIPE_WANTW);
678 
679 		/*
680 		 * But we may also have to deal with a kqueue which is
681 		 * stored on the same pipe as its descriptor, so a
682 		 * EVFILT_WRITE event waiting for our side to drain will
683 		 * be on the other side.
684 		 */
685 		pipewakeup(wpb, 0);
686 	}
687 	/*size = rpb->windex - rpb->rindex;*/
688 	lwkt_reltoken(&rpb->rlock);
689 
690 	return (error);
691 }
692 
693 static int
694 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
695 {
696 	struct pipebuf *rpb;
697 	struct pipebuf *wpb;
698 	struct pipe *pipe;
699 	size_t windex;
700 	size_t space;
701 	size_t wcount;
702 	size_t orig_resid;
703 	int bigwrite;
704 	int bigcount;
705 	int error;
706 	int nbio;
707 
708 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
709 	if ((intptr_t)fp->f_data & 1) {
710 		rpb = &pipe->bufferB;
711 		wpb = &pipe->bufferA;
712 	} else {
713 		rpb = &pipe->bufferA;
714 		wpb = &pipe->bufferB;
715 	}
716 
717 	/*
718 	 * Calculate nbio
719 	 */
720 	if (fflags & O_FBLOCKING)
721 		nbio = 0;
722 	else if (fflags & O_FNONBLOCKING)
723 		nbio = 1;
724 	else if (fp->f_flag & O_NONBLOCK)
725 		nbio = 1;
726 	else
727 		nbio = 0;
728 
729 	/*
730 	 * 'quick' NBIO test before things get expensive.
731 	 */
732 	if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
733 	    uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
734 		return EAGAIN;
735 	}
736 
737 	/*
738 	 * Writes go to the peer.  The peer will always exist.
739 	 */
740 	lwkt_gettoken(&wpb->wlock);
741 	if (wpb->state & PIPE_WEOF) {
742 		lwkt_reltoken(&wpb->wlock);
743 		return (EPIPE);
744 	}
745 
746 	/*
747 	 * Degenerate case (EPIPE takes prec)
748 	 */
749 	if (uio->uio_resid == 0) {
750 		lwkt_reltoken(&wpb->wlock);
751 		return(0);
752 	}
753 
754 	/*
755 	 * Writes are serialized (start_uio must be called with wlock)
756 	 */
757 	error = pipe_start_uio(&wpb->wip);
758 	if (error) {
759 		lwkt_reltoken(&wpb->wlock);
760 		return (error);
761 	}
762 
763 	orig_resid = uio->uio_resid;
764 	wcount = 0;
765 
766 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
767 	bigcount = 10;
768 
769 	while (uio->uio_resid) {
770 		if (wpb->state & PIPE_WEOF) {
771 			error = EPIPE;
772 			break;
773 		}
774 
775 		/*
776 		 * Don't hog the cpu.
777 		 */
778 		if (bigwrite && --bigcount == 0) {
779 			lwkt_user_yield();
780 			bigcount = 10;
781 			if (CURSIG(curthread->td_lwp)) {
782 				error = EINTR;
783 				break;
784 			}
785 		}
786 
787 		windex = wpb->windex & (wpb->size - 1);
788 		space = wpb->size - (wpb->windex - wpb->rindex);
789 
790 		/*
791 		 * Writes of size <= PIPE_BUF must be atomic.
792 		 */
793 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
794 			space = 0;
795 
796 		/*
797 		 * Write to fill, read size handles write hysteresis.  Also
798 		 * additional restrictions can cause select-based non-blocking
799 		 * writes to spin.
800 		 */
801 		if (space > 0) {
802 			size_t segsize;
803 
804 			/*
805 			 * We want to notify a potentially waiting reader
806 			 * before we exhaust the write buffer for SMP
807 			 * pipelining.  Otherwise the write/read will begin
808 			 * to ping-pong.
809 			 */
810 			space = szmin(space, uio->uio_resid);
811 			if (space > (wpb->size >> 1))
812 				space = (wpb->size >> 1);
813 
814 			/*
815 			 * First segment to transfer is minimum of
816 			 * transfer size and contiguous space in
817 			 * pipe buffer.  If first segment to transfer
818 			 * is less than the transfer size, we've got
819 			 * a wraparound in the buffer.
820 			 */
821 			segsize = wpb->size - windex;
822 			if (segsize > space)
823 				segsize = space;
824 
825 			/*
826 			 * If this is the first loop and the reader is
827 			 * blocked, do a preemptive wakeup of the reader.
828 			 *
829 			 * On SMP the IPI latency plus the wlock interlock
830 			 * on the reader side is the fastest way to get the
831 			 * reader going.  (The scheduler will hard loop on
832 			 * lock tokens).
833 			 */
834 			if (wcount == 0)
835 				pipesignal(wpb, PIPE_WANTR);
836 
837 			/*
838 			 * Transfer segment, which may include a wrap-around.
839 			 * Update windex to account for both all in one go
840 			 * so the reader can read() the data atomically.
841 			 */
842 			error = uiomove(&wpb->buffer[windex], segsize, uio);
843 			if (error == 0 && segsize < space) {
844 				segsize = space - segsize;
845 				error = uiomove(&wpb->buffer[0], segsize, uio);
846 			}
847 			if (error)
848 				break;
849 
850 			/*
851 			 * Memory fence prior to windex updating (note: not
852 			 * needed so this is a NOP on Intel).
853 			 */
854 			cpu_sfence();
855 			wpb->windex += space;
856 
857 			/*
858 			 * Signal reader
859 			 */
860 			if (wcount != 0)
861 				pipesignal(wpb, PIPE_WANTR);
862 			wcount += space;
863 			continue;
864 		}
865 
866 		/*
867 		 * Wakeup any pending reader
868 		 */
869 		pipesignal(wpb, PIPE_WANTR);
870 
871 		/*
872 		 * don't block on non-blocking I/O
873 		 */
874 		if (nbio) {
875 			error = EAGAIN;
876 			break;
877 		}
878 
879 #ifdef _RDTSC_SUPPORTED_
880 		if (pipe_delay) {
881 			int64_t tsc_target;
882 			int good = 0;
883 
884 			tsc_target = tsc_get_target(pipe_delay);
885 			while (tsc_test_target(tsc_target) == 0) {
886 				cpu_lfence();
887 				space = wpb->size - (wpb->windex - wpb->rindex);
888 				if ((space < uio->uio_resid) &&
889 				    (orig_resid <= PIPE_BUF)) {
890 					space = 0;
891 				}
892 				if (space) {
893 					good = 1;
894 					break;
895 				}
896 				cpu_pause();
897 			}
898 			if (good)
899 				continue;
900 		}
901 #endif
902 
903 		/*
904 		 * Interlocked test.   Atomic op enforces the memory barrier.
905 		 */
906 		tsleep_interlock(wpb, PCATCH);
907 		atomic_set_int(&wpb->state, PIPE_WANTW);
908 
909 		/*
910 		 * Retest space available after memory barrier above.
911 		 * Writes of size <= PIPE_BUF must be atomic.
912 		 */
913 		space = wpb->size - (wpb->windex - wpb->rindex);
914 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
915 			space = 0;
916 
917 		/*
918 		 * Retest EOF after memory barrier above.
919 		 */
920 		if (wpb->state & PIPE_WEOF) {
921 			error = EPIPE;
922 			break;
923 		}
924 
925 		/*
926 		 * We have no more space and have something to offer,
927 		 * wake up select/poll/kq.
928 		 */
929 		if (space == 0) {
930 			pipewakeup(wpb, 1);
931 			error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
932 		}
933 
934 		/*
935 		 * Break out if we errored or the read side wants us to go
936 		 * away.
937 		 */
938 		if (error)
939 			break;
940 		if (wpb->state & PIPE_WEOF) {
941 			error = EPIPE;
942 			break;
943 		}
944 	}
945 	pipe_end_uio(&wpb->wip);
946 
947 	/*
948 	 * If we have put any characters in the buffer, we wake up
949 	 * the reader.
950 	 *
951 	 * Both rlock and wlock are required to be able to modify pipe_state.
952 	 */
953 	if (wpb->windex != wpb->rindex) {
954 		pipesignal(wpb, PIPE_WANTR);
955 		pipewakeup(wpb, 1);
956 	}
957 
958 	/*
959 	 * Don't return EPIPE if I/O was successful
960 	 */
961 	if ((wpb->rindex == wpb->windex) &&
962 	    (uio->uio_resid == 0) &&
963 	    (error == EPIPE)) {
964 		error = 0;
965 	}
966 
967 	if (error == 0 && wpb->lticks != ticks) {
968 		vfs_timestamp(&wpb->mtime);
969 		wpb->lticks = ticks;
970 	}
971 
972 	/*
973 	 * We have something to offer,
974 	 * wake up select/poll/kq.
975 	 */
976 	/*space = wpb->windex - wpb->rindex;*/
977 	lwkt_reltoken(&wpb->wlock);
978 
979 	return (error);
980 }
981 
982 /*
983  * we implement a very minimal set of ioctls for compatibility with sockets.
984  */
985 static int
986 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987 	   struct ucred *cred, struct sysmsg *msg)
988 {
989 	struct pipebuf *rpb;
990 	struct pipe *pipe;
991 	int error;
992 
993 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
994 	if ((intptr_t)fp->f_data & 1) {
995 		rpb = &pipe->bufferB;
996 	} else {
997 		rpb = &pipe->bufferA;
998 	}
999 
1000 	lwkt_gettoken(&rpb->rlock);
1001 	lwkt_gettoken(&rpb->wlock);
1002 
1003 	switch (cmd) {
1004 	case FIOASYNC:
1005 		if (*(int *)data) {
1006 			atomic_set_int(&rpb->state, PIPE_ASYNC);
1007 		} else {
1008 			atomic_clear_int(&rpb->state, PIPE_ASYNC);
1009 		}
1010 		error = 0;
1011 		break;
1012 	case FIONREAD:
1013 		*(int *)data = (int)(rpb->windex - rpb->rindex);
1014 		error = 0;
1015 		break;
1016 	case FIOSETOWN:
1017 		error = fsetown(*(int *)data, &rpb->sigio);
1018 		break;
1019 	case FIOGETOWN:
1020 		*(int *)data = fgetown(&rpb->sigio);
1021 		error = 0;
1022 		break;
1023 	case TIOCSPGRP:
1024 		/* This is deprecated, FIOSETOWN should be used instead. */
1025 		error = fsetown(-(*(int *)data), &rpb->sigio);
1026 		break;
1027 
1028 	case TIOCGPGRP:
1029 		/* This is deprecated, FIOGETOWN should be used instead. */
1030 		*(int *)data = -fgetown(&rpb->sigio);
1031 		error = 0;
1032 		break;
1033 	default:
1034 		error = ENOTTY;
1035 		break;
1036 	}
1037 	lwkt_reltoken(&rpb->wlock);
1038 	lwkt_reltoken(&rpb->rlock);
1039 
1040 	return (error);
1041 }
1042 
1043 /*
1044  * MPSAFE
1045  */
1046 static int
1047 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1048 {
1049 	struct pipebuf *rpb;
1050 	struct pipe *pipe;
1051 
1052 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1053 	if ((intptr_t)fp->f_data & 1) {
1054 		rpb = &pipe->bufferB;
1055 	} else {
1056 		rpb = &pipe->bufferA;
1057 	}
1058 
1059 	bzero((caddr_t)ub, sizeof(*ub));
1060 	ub->st_mode = S_IFIFO;
1061 	ub->st_blksize = rpb->size;
1062 	ub->st_size = rpb->windex - rpb->rindex;
1063 	ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1064 	ub->st_atimespec = rpb->atime;
1065 	ub->st_mtimespec = rpb->mtime;
1066 	ub->st_ctimespec = pipe->ctime;
1067 	ub->st_uid = fp->f_cred->cr_uid;
1068 	ub->st_gid = fp->f_cred->cr_gid;
1069 	ub->st_ino = pipe->inum;
1070 	/*
1071 	 * Left as 0: st_dev, st_nlink, st_rdev,
1072 	 * st_flags, st_gen.
1073 	 * XXX (st_dev, st_ino) should be unique.
1074 	 */
1075 
1076 	return (0);
1077 }
1078 
1079 static int
1080 pipe_close(struct file *fp)
1081 {
1082 	struct pipebuf *rpb;
1083 	struct pipebuf *wpb;
1084 	struct pipe *pipe;
1085 
1086 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1087 	if ((intptr_t)fp->f_data & 1) {
1088 		rpb = &pipe->bufferB;
1089 		wpb = &pipe->bufferA;
1090 	} else {
1091 		rpb = &pipe->bufferA;
1092 		wpb = &pipe->bufferB;
1093 	}
1094 
1095 	fp->f_ops = &badfileops;
1096 	fp->f_data = NULL;
1097 	funsetown(&rpb->sigio);
1098 	pipeclose(pipe, rpb, wpb);
1099 
1100 	return (0);
1101 }
1102 
1103 /*
1104  * Shutdown one or both directions of a full-duplex pipe.
1105  */
1106 static int
1107 pipe_shutdown(struct file *fp, int how)
1108 {
1109 	struct pipebuf *rpb;
1110 	struct pipebuf *wpb;
1111 	struct pipe *pipe;
1112 	int error = EPIPE;
1113 
1114 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1115 	if ((intptr_t)fp->f_data & 1) {
1116 		rpb = &pipe->bufferB;
1117 		wpb = &pipe->bufferA;
1118 	} else {
1119 		rpb = &pipe->bufferA;
1120 		wpb = &pipe->bufferB;
1121 	}
1122 
1123 	/*
1124 	 * We modify pipe_state on both pipes, which means we need
1125 	 * all four tokens!
1126 	 */
1127 	lwkt_gettoken(&rpb->rlock);
1128 	lwkt_gettoken(&rpb->wlock);
1129 	lwkt_gettoken(&wpb->rlock);
1130 	lwkt_gettoken(&wpb->wlock);
1131 
1132 	switch(how) {
1133 	case SHUT_RDWR:
1134 	case SHUT_RD:
1135 		/*
1136 		 * EOF on my reads and peer writes
1137 		 */
1138 		atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1139 		if (rpb->state & PIPE_WANTR) {
1140 			rpb->state &= ~PIPE_WANTR;
1141 			wakeup(rpb);
1142 		}
1143 		if (rpb->state & PIPE_WANTW) {
1144 			rpb->state &= ~PIPE_WANTW;
1145 			wakeup(rpb);
1146 		}
1147 		error = 0;
1148 		if (how == SHUT_RD)
1149 			break;
1150 		/* fall through */
1151 	case SHUT_WR:
1152 		/*
1153 		 * EOF on peer reads and my writes
1154 		 */
1155 		atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1156 		if (wpb->state & PIPE_WANTR) {
1157 			wpb->state &= ~PIPE_WANTR;
1158 			wakeup(wpb);
1159 		}
1160 		if (wpb->state & PIPE_WANTW) {
1161 			wpb->state &= ~PIPE_WANTW;
1162 			wakeup(wpb);
1163 		}
1164 		error = 0;
1165 		break;
1166 	}
1167 	pipewakeup(rpb, 1);
1168 	pipewakeup(wpb, 1);
1169 
1170 	lwkt_reltoken(&wpb->wlock);
1171 	lwkt_reltoken(&wpb->rlock);
1172 	lwkt_reltoken(&rpb->wlock);
1173 	lwkt_reltoken(&rpb->rlock);
1174 
1175 	return (error);
1176 }
1177 
1178 /*
1179  * Destroy the pipe buffer.
1180  */
1181 static void
1182 pipe_free_kmem(struct pipebuf *pb)
1183 {
1184 	if (pb->buffer != NULL) {
1185 		kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1186 		pb->buffer = NULL;
1187 		pb->object = NULL;
1188 	}
1189 }
1190 
1191 /*
1192  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1193  * and writing on wpb.  This routine must be called twice with the pipebufs
1194  * reversed to close both directions.
1195  */
1196 static void
1197 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1198 {
1199 	globaldata_t gd;
1200 
1201 	if (pipe == NULL)
1202 		return;
1203 
1204 	/*
1205 	 * We need both the read and write tokens to modify pipe_state.
1206 	 */
1207 	lwkt_gettoken(&rpb->rlock);
1208 	lwkt_gettoken(&rpb->wlock);
1209 
1210 	/*
1211 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1212 	 * wakeup anyone blocked on our pipe.  No action if our side
1213 	 * is already closed.
1214 	 */
1215 	if (rpb->state & PIPE_CLOSED) {
1216 		lwkt_reltoken(&rpb->wlock);
1217 		lwkt_reltoken(&rpb->rlock);
1218 		return;
1219 	}
1220 
1221 	atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1222 	pipewakeup(rpb, 1);
1223 	if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224 		rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225 		wakeup(rpb);
1226 	}
1227 	lwkt_reltoken(&rpb->wlock);
1228 	lwkt_reltoken(&rpb->rlock);
1229 
1230 	/*
1231 	 * Disconnect from peer.
1232 	 */
1233 	lwkt_gettoken(&wpb->rlock);
1234 	lwkt_gettoken(&wpb->wlock);
1235 
1236 	atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1237 	pipewakeup(wpb, 1);
1238 	if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1239 		wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1240 		wakeup(wpb);
1241 	}
1242 	if (SLIST_FIRST(&wpb->kq.ki_note))
1243 		KNOTE(&wpb->kq.ki_note, 0);
1244 	lwkt_reltoken(&wpb->wlock);
1245 	lwkt_reltoken(&wpb->rlock);
1246 
1247 	/*
1248 	 * Free resources once both sides are closed.  We maintain a pcpu
1249 	 * cache to improve performance, so the actual tear-down case is
1250 	 * limited to bulk situations.
1251 	 *
1252 	 * However, the bulk tear-down case can cause intense contention
1253 	 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254 	 * of processes are killed at the same time.  To deal with this we
1255 	 * use a pcpu mutex to maintain concurrency but also limit the
1256 	 * number of threads banging on the map and pmap.
1257 	 *
1258 	 * We use the mtx mechanism instead of the lockmgr mechanism because
1259 	 * the mtx mechanism utilizes a queued design which will not break
1260 	 * down in the face of thousands to hundreds of thousands of
1261 	 * processes trying to free pipes simultaneously.  The lockmgr
1262 	 * mechanism will wind up waking them all up each time a lock
1263 	 * cycles.
1264 	 */
1265 	if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1266 		gd = mycpu;
1267 		if (gd->gd_pipeqcount >= pipe_maxcache) {
1268 			mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1269 			pipe_free_kmem(rpb);
1270 			pipe_free_kmem(wpb);
1271 			mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1272 			kfree(pipe, M_PIPE);
1273 		} else {
1274 			rpb->state = 0;
1275 			wpb->state = 0;
1276 			pipe->next = gd->gd_pipeq;
1277 			gd->gd_pipeq = pipe;
1278 			++gd->gd_pipeqcount;
1279 		}
1280 	}
1281 }
1282 
1283 static int
1284 pipe_kqfilter(struct file *fp, struct knote *kn)
1285 {
1286 	struct pipebuf *rpb;
1287 	struct pipebuf *wpb;
1288 	struct pipe *pipe;
1289 
1290 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1291 	if ((intptr_t)fp->f_data & 1) {
1292 		rpb = &pipe->bufferB;
1293 		wpb = &pipe->bufferA;
1294 	} else {
1295 		rpb = &pipe->bufferA;
1296 		wpb = &pipe->bufferB;
1297 	}
1298 
1299 	switch (kn->kn_filter) {
1300 	case EVFILT_READ:
1301 		kn->kn_fop = &pipe_rfiltops;
1302 		break;
1303 	case EVFILT_WRITE:
1304 		kn->kn_fop = &pipe_wfiltops;
1305 		if (wpb->state & PIPE_CLOSED) {
1306 			/* other end of pipe has been closed */
1307 			return (EPIPE);
1308 		}
1309 		break;
1310 	default:
1311 		return (EOPNOTSUPP);
1312 	}
1313 
1314 	if (rpb == &pipe->bufferA)
1315 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1316 	else
1317 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1318 
1319 	knote_insert(&rpb->kq.ki_note, kn);
1320 
1321 	return (0);
1322 }
1323 
1324 static void
1325 filt_pipedetach(struct knote *kn)
1326 {
1327 	struct pipebuf *rpb;
1328 	struct pipebuf *wpb;
1329 	struct pipe *pipe;
1330 
1331 	pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1332 	if ((intptr_t)kn->kn_hook & 1) {
1333 		rpb = &pipe->bufferB;
1334 		wpb = &pipe->bufferA;
1335 	} else {
1336 		rpb = &pipe->bufferA;
1337 		wpb = &pipe->bufferB;
1338 	}
1339 	knote_remove(&rpb->kq.ki_note, kn);
1340 }
1341 
1342 /*ARGSUSED*/
1343 static int
1344 filt_piperead(struct knote *kn, long hint)
1345 {
1346 	struct pipebuf *rpb;
1347 	struct pipebuf *wpb;
1348 	struct pipe *pipe;
1349 	int ready = 0;
1350 
1351 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1352 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1353 		rpb = &pipe->bufferB;
1354 		wpb = &pipe->bufferA;
1355 	} else {
1356 		rpb = &pipe->bufferA;
1357 		wpb = &pipe->bufferB;
1358 	}
1359 
1360 	/*
1361 	 * We shouldn't need the pipe locks because the knote itself is
1362 	 * locked via KN_PROCESSING.  If we lose a race against the writer,
1363 	 * the writer will just issue a KNOTE() after us.
1364 	 */
1365 #if 0
1366 	lwkt_gettoken(&rpb->rlock);
1367 	lwkt_gettoken(&rpb->wlock);
1368 #endif
1369 
1370 	kn->kn_data = rpb->windex - rpb->rindex;
1371 	if (kn->kn_data < 0)
1372 		kn->kn_data = 0;
1373 
1374 	if (rpb->state & PIPE_REOF) {
1375 		/*
1376 		 * Only set NODATA if all data has been exhausted
1377 		 */
1378 		if (kn->kn_data == 0)
1379 			kn->kn_flags |= EV_NODATA;
1380 		kn->kn_flags |= EV_EOF;
1381 		ready = 1;
1382 	}
1383 
1384 #if 0
1385 	lwkt_reltoken(&rpb->wlock);
1386 	lwkt_reltoken(&rpb->rlock);
1387 #endif
1388 
1389 	if (!ready)
1390 		ready = kn->kn_data > 0;
1391 
1392 	return (ready);
1393 }
1394 
1395 /*ARGSUSED*/
1396 static int
1397 filt_pipewrite(struct knote *kn, long hint)
1398 {
1399 	struct pipebuf *rpb;
1400 	struct pipebuf *wpb;
1401 	struct pipe *pipe;
1402 	int ready = 0;
1403 
1404 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1405 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1406 		rpb = &pipe->bufferB;
1407 		wpb = &pipe->bufferA;
1408 	} else {
1409 		rpb = &pipe->bufferA;
1410 		wpb = &pipe->bufferB;
1411 	}
1412 
1413 	kn->kn_data = 0;
1414 	if (wpb->state & PIPE_CLOSED) {
1415 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1416 		return (1);
1417 	}
1418 
1419 	/*
1420 	 * We shouldn't need the pipe locks because the knote itself is
1421 	 * locked via KN_PROCESSING.  If we lose a race against the reader,
1422 	 * the writer will just issue a KNOTE() after us.
1423 	 */
1424 #if 0
1425 	lwkt_gettoken(&wpb->rlock);
1426 	lwkt_gettoken(&wpb->wlock);
1427 #endif
1428 
1429 	if (wpb->state & PIPE_WEOF) {
1430 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1431 		ready = 1;
1432 	}
1433 
1434 	if (!ready) {
1435 		kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1436 		if (kn->kn_data < 0)
1437 			kn->kn_data = 0;
1438 	}
1439 
1440 #if 0
1441 	lwkt_reltoken(&wpb->wlock);
1442 	lwkt_reltoken(&wpb->rlock);
1443 #endif
1444 
1445 	if (!ready)
1446 		ready = kn->kn_data >= PIPE_BUF;
1447 
1448 	return (ready);
1449 }
1450