xref: /dragonfly/sys/kern/sys_pipe.c (revision 2dac8a3e)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68 
69 #include <machine/cpufunc.h>
70 
71 struct pipegdlock {
72 	struct mtx	mtx;
73 } __cachealign;
74 
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 		struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 		struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 		struct ucred *cred, struct sysmsg *msg);
88 
89 __read_mostly static struct fileops pipeops = {
90 	.fo_read = pipe_read,
91 	.fo_write = pipe_write,
92 	.fo_ioctl = pipe_ioctl,
93 	.fo_kqfilter = pipe_kqfilter,
94 	.fo_stat = pipe_stat,
95 	.fo_close = pipe_close,
96 	.fo_shutdown = pipe_shutdown
97 };
98 
99 static void	filt_pipedetach(struct knote *kn);
100 static int	filt_piperead(struct knote *kn, long hint);
101 static int	filt_pipewrite(struct knote *kn, long hint);
102 
103 __read_mostly static struct filterops pipe_rfiltops =
104 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 __read_mostly static struct filterops pipe_wfiltops =
106 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107 
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109 
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111 
112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 __read_mostly static struct pipegdlock *pipe_gdlocks;
114 
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118 
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 __read_mostly static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127 
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 __read_mostly static int pipe_delay = 4000;	/* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143 
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151 	size_t mbytes = kmem_lim_size();
152 	int n;
153 
154 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 		if (mbytes >= 7 * 1024)
156 			pipe_maxcache *= 2;
157 		if (mbytes >= 15 * 1024)
158 			pipe_maxcache *= 2;
159 	}
160 	pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161 			     M_PIPE, M_WAITOK | M_ZERO);
162 	for (n = 0; n < ncpus; ++n)
163 		mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
164 }
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
166 
167 static void pipeclose (struct pipe *pipe,
168 		struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
171 
172 /*
173  * Test and clear the specified flag, wakeup(pb) if it was set.
174  * This function must also act as a memory barrier.
175  */
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
178 {
179 	uint32_t oflags;
180 	uint32_t nflags;
181 
182 	for (;;) {
183 		oflags = pb->state;
184 		cpu_ccfence();
185 		nflags = oflags & ~flags;
186 		if (atomic_cmpset_int(&pb->state, oflags, nflags))
187 			break;
188 	}
189 	if (oflags & flags)
190 		wakeup(pb);
191 }
192 
193 /*
194  *
195  */
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
198 {
199 	if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200 		lwkt_gettoken(&sigio_token);
201 		pgsigio(pb->sigio, SIGIO, 0);
202 		lwkt_reltoken(&sigio_token);
203 	}
204 	KNOTE(&pb->kq.ki_note, 0);
205 }
206 
207 /*
208  * These routines are called before and after a UIO.  The UIO
209  * may block, causing our held tokens to be lost temporarily.
210  *
211  * We use these routines to serialize reads against other reads
212  * and writes against other writes.
213  *
214  * The appropriate token is held on entry so *ipp does not race.
215  */
216 static __inline int
217 pipe_start_uio(int *ipp)
218 {
219 	int error;
220 
221 	while (*ipp) {
222 		*ipp = -1;
223 		error = tsleep(ipp, PCATCH, "pipexx", 0);
224 		if (error)
225 			return (error);
226 	}
227 	*ipp = 1;
228 	return (0);
229 }
230 
231 static __inline void
232 pipe_end_uio(int *ipp)
233 {
234 	if (*ipp < 0) {
235 		*ipp = 0;
236 		wakeup(ipp);
237 	} else {
238 		KKASSERT(*ipp > 0);
239 		*ipp = 0;
240 	}
241 }
242 
243 /*
244  * The pipe system call for the DTYPE_PIPE type of pipes
245  *
246  * pipe_args(int dummy)
247  *
248  * MPSAFE
249  */
250 int
251 sys_pipe(struct pipe_args *uap)
252 {
253 	return kern_pipe(uap->sysmsg_fds, 0);
254 }
255 
256 int
257 sys_pipe2(struct pipe2_args *uap)
258 {
259 	return kern_pipe(uap->sysmsg_fds, uap->flags);
260 }
261 
262 int
263 kern_pipe(long *fds, int flags)
264 {
265 	struct thread *td = curthread;
266 	struct filedesc *fdp = td->td_proc->p_fd;
267 	struct file *rf, *wf;
268 	struct pipe *pipe;
269 	int fd1, fd2, error;
270 
271 	pipe = NULL;
272 	if (pipe_create(&pipe)) {
273 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275 		return (ENFILE);
276 	}
277 
278 	error = falloc(td->td_lwp, &rf, &fd1);
279 	if (error) {
280 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282 		return (error);
283 	}
284 	fds[0] = fd1;
285 
286 	/*
287 	 * Warning: once we've gotten past allocation of the fd for the
288 	 * read-side, we can only drop the read side via fdrop() in order
289 	 * to avoid races against processes which manage to dup() the read
290 	 * side while we are blocked trying to allocate the write side.
291 	 */
292 	rf->f_type = DTYPE_PIPE;
293 	rf->f_flag = FREAD | FWRITE;
294 	rf->f_ops = &pipeops;
295 	rf->f_data = (void *)((intptr_t)pipe | 0);
296 	if (flags & O_NONBLOCK)
297 		rf->f_flag |= O_NONBLOCK;
298 	if (flags & O_CLOEXEC)
299 		fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
300 
301 	error = falloc(td->td_lwp, &wf, &fd2);
302 	if (error) {
303 		fsetfd(fdp, NULL, fd1);
304 		fdrop(rf);
305 		/* pipeA has been closed by fdrop() */
306 		/* close pipeB here */
307 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308 		return (error);
309 	}
310 	wf->f_type = DTYPE_PIPE;
311 	wf->f_flag = FREAD | FWRITE;
312 	wf->f_ops = &pipeops;
313 	wf->f_data = (void *)((intptr_t)pipe | 1);
314 	if (flags & O_NONBLOCK)
315 		wf->f_flag |= O_NONBLOCK;
316 	if (flags & O_CLOEXEC)
317 		fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
318 
319 	fds[1] = fd2;
320 
321 	/*
322 	 * Once activated the peer relationship remains valid until
323 	 * both sides are closed.
324 	 */
325 	fsetfd(fdp, rf, fd1);
326 	fsetfd(fdp, wf, fd2);
327 	fdrop(rf);
328 	fdrop(wf);
329 
330 	return (0);
331 }
332 
333 /*
334  * [re]allocates KVA for the pipe's circular buffer.  The space is
335  * pageable.  Called twice to setup full-duplex communications.
336  *
337  * NOTE: Independent vm_object's are used to improve performance.
338  *
339  * Returns 0 on success, ENOMEM on failure.
340  */
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
343 {
344 	struct vm_object *object;
345 	caddr_t buffer;
346 	vm_pindex_t npages;
347 	int error;
348 
349 	size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350 	if (size < 16384)
351 		size = 16384;
352 	if (size > 1024*1024)
353 		size = 1024*1024;
354 
355 	npages = round_page(size) / PAGE_SIZE;
356 	object = pb->object;
357 
358 	/*
359 	 * [re]create the object if necessary and reserve space for it
360 	 * in the kernel_map.  The object and memory are pageable.  On
361 	 * success, free the old resources before assigning the new
362 	 * ones.
363 	 */
364 	if (object == NULL || object->size != npages) {
365 		object = vm_object_allocate(OBJT_DEFAULT, npages);
366 		buffer = (caddr_t)vm_map_min(&kernel_map);
367 
368 		error = vm_map_find(&kernel_map, object, NULL,
369 				    0, (vm_offset_t *)&buffer, size,
370 				    PAGE_SIZE, TRUE,
371 				    VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372 				    VM_PROT_ALL, VM_PROT_ALL, 0);
373 
374 		if (error != KERN_SUCCESS) {
375 			vm_object_deallocate(object);
376 			return (ENOMEM);
377 		}
378 		pipe_free_kmem(pb);
379 		pb->object = object;
380 		pb->buffer = buffer;
381 		pb->size = size;
382 	}
383 	pb->rindex = 0;
384 	pb->windex = 0;
385 
386 	return (0);
387 }
388 
389 /*
390  * Initialize and allocate VM and memory for pipe, pulling the pipe from
391  * our per-cpu cache if possible.
392  *
393  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
394  * must still deallocate the pipe on failure.
395  */
396 static int
397 pipe_create(struct pipe **pipep)
398 {
399 	globaldata_t gd = mycpu;
400 	struct pipe *pipe;
401 	int error;
402 
403 	if ((pipe = gd->gd_pipeq) != NULL) {
404 		gd->gd_pipeq = pipe->next;
405 		--gd->gd_pipeqcount;
406 		pipe->next = NULL;
407 	} else {
408 		pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409 		pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
410 		lwkt_token_init(&pipe->bufferA.rlock, "piper");
411 		lwkt_token_init(&pipe->bufferA.wlock, "pipew");
412 		lwkt_token_init(&pipe->bufferB.rlock, "piper");
413 		lwkt_token_init(&pipe->bufferB.wlock, "pipew");
414 	}
415 	*pipep = pipe;
416 	if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
417 		return (error);
418 	}
419 	if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
420 		return (error);
421 	}
422 	vfs_timestamp(&pipe->ctime);
423 	pipe->bufferA.atime = pipe->ctime;
424 	pipe->bufferA.mtime = pipe->ctime;
425 	pipe->bufferB.atime = pipe->ctime;
426 	pipe->bufferB.mtime = pipe->ctime;
427 	pipe->open_count = 2;
428 
429 	return (0);
430 }
431 
432 /*
433  * Read data from a pipe
434  */
435 static int
436 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
437 {
438 	struct pipebuf *rpb;
439 	struct pipebuf *wpb;
440 	struct pipe *pipe;
441 	size_t nread = 0;
442 	size_t size;	/* total bytes available */
443 	size_t nsize;	/* total bytes to read */
444 	size_t rindex;	/* contiguous bytes available */
445 	int notify_writer;
446 	int bigread;
447 	int bigcount;
448 	int error;
449 	int nbio;
450 
451 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
452 	if ((intptr_t)fp->f_data & 1) {
453 		rpb = &pipe->bufferB;
454 		wpb = &pipe->bufferA;
455 	} else {
456 		rpb = &pipe->bufferA;
457 		wpb = &pipe->bufferB;
458 	}
459 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
460 
461 	if (uio->uio_resid == 0)
462 		return(0);
463 
464 	/*
465 	 * Calculate nbio
466 	 */
467 	if (fflags & O_FBLOCKING)
468 		nbio = 0;
469 	else if (fflags & O_FNONBLOCKING)
470 		nbio = 1;
471 	else if (fp->f_flag & O_NONBLOCK)
472 		nbio = 1;
473 	else
474 		nbio = 0;
475 
476 	/*
477 	 * 'quick' NBIO test before things get expensive.
478 	 */
479 	if (nbio && rpb->rindex == rpb->windex &&
480 	    (rpb->state & PIPE_REOF) == 0) {
481 		return EAGAIN;
482 	}
483 
484 	/*
485 	 * Reads are serialized.  Note however that buffer.buffer and
486 	 * buffer.size can change out from under us when the number
487 	 * of bytes in the buffer are zero due to the write-side doing a
488 	 * pipespace().
489 	 */
490 	lwkt_gettoken(&rpb->rlock);
491 	error = pipe_start_uio(&rpb->rip);
492 	if (error) {
493 		lwkt_reltoken(&rpb->rlock);
494 		return (error);
495 	}
496 	notify_writer = 0;
497 
498 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
499 	bigcount = 10;
500 
501 	while (uio->uio_resid) {
502 		/*
503 		 * Don't hog the cpu.
504 		 */
505 		if (bigread && --bigcount == 0) {
506 			lwkt_user_yield();
507 			bigcount = 10;
508 			if (CURSIG(curthread->td_lwp)) {
509 				error = EINTR;
510 				break;
511 			}
512 		}
513 
514 		/*
515 		 * lfence required to avoid read-reordering of buffer
516 		 * contents prior to validation of size.
517 		 */
518 		size = rpb->windex - rpb->rindex;
519 		cpu_lfence();
520 		if (size) {
521 			rindex = rpb->rindex & (rpb->size - 1);
522 			nsize = size;
523 			if (nsize > rpb->size - rindex)
524 				nsize = rpb->size - rindex;
525 			nsize = szmin(nsize, uio->uio_resid);
526 
527 			/*
528 			 * Limit how much we move in one go so we have a
529 			 * chance to kick the writer while data is still
530 			 * available in the pipe.  This avoids getting into
531 			 * a ping-pong with the writer.
532 			 */
533 			if (nsize > (rpb->size >> 1))
534 				nsize = rpb->size >> 1;
535 
536 			error = uiomove(&rpb->buffer[rindex], nsize, uio);
537 			if (error)
538 				break;
539 			rpb->rindex += nsize;
540 			nread += nsize;
541 
542 			/*
543 			 * If the FIFO is still over half full just continue
544 			 * and do not try to notify the writer yet.  If
545 			 * less than half full notify any waiting writer.
546 			 */
547 			if (size - nsize > (rpb->size >> 1)) {
548 				notify_writer = 0;
549 			} else {
550 				notify_writer = 1;
551 				pipesignal(rpb, PIPE_WANTW);
552 			}
553 			continue;
554 		}
555 
556 		/*
557 		 * If the "write-side" was blocked we wake it up.  This code
558 		 * is reached when the buffer is completely emptied.
559 		 */
560 		pipesignal(rpb, PIPE_WANTW);
561 
562 		/*
563 		 * Pick up our copy loop again if the writer sent data to
564 		 * us while we were messing around.
565 		 *
566 		 * On a SMP box poll up to pipe_delay nanoseconds for new
567 		 * data.  Typically a value of 2000 to 4000 is sufficient
568 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
569 		 * is used for synchronous communications with small packets,
570 		 * and 8000 or so (8uS) will pipeline large buffer xfers
571 		 * between cpus over a pipe.
572 		 *
573 		 * For synchronous communications a hit means doing a
574 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
575 		 * where as miss requiring a tsleep/wakeup sequence
576 		 * will take 7uS or more.
577 		 */
578 		if (rpb->windex != rpb->rindex)
579 			continue;
580 
581 #ifdef _RDTSC_SUPPORTED_
582 		if (pipe_delay) {
583 			int64_t tsc_target;
584 			int good = 0;
585 
586 			tsc_target = tsc_get_target(pipe_delay);
587 			while (tsc_test_target(tsc_target) == 0) {
588 				cpu_lfence();
589 				if (rpb->windex != rpb->rindex) {
590 					good = 1;
591 					break;
592 				}
593 				cpu_pause();
594 			}
595 			if (good)
596 				continue;
597 		}
598 #endif
599 
600 		/*
601 		 * Detect EOF condition, do not set error.
602 		 */
603 		if (rpb->state & PIPE_REOF)
604 			break;
605 
606 		/*
607 		 * Break if some data was read, or if this was a non-blocking
608 		 * read.
609 		 */
610 		if (nread > 0)
611 			break;
612 
613 		if (nbio) {
614 			error = EAGAIN;
615 			break;
616 		}
617 
618 		/*
619 		 * Last chance, interlock with WANTR
620 		 */
621 		tsleep_interlock(rpb, PCATCH);
622 		atomic_set_int(&rpb->state, PIPE_WANTR);
623 
624 		/*
625 		 * Retest bytes available after memory barrier above.
626 		 */
627 		size = rpb->windex - rpb->rindex;
628 		if (size)
629 			continue;
630 
631 		/*
632 		 * Retest EOF after memory barrier above.
633 		 */
634 		if (rpb->state & PIPE_REOF)
635 			break;
636 
637 		/*
638 		 * Wait for more data or state change
639 		 */
640 		error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
641 		if (error)
642 			break;
643 	}
644 	pipe_end_uio(&rpb->rip);
645 
646 	/*
647 	 * Uptime last access time
648 	 */
649 	if (error == 0 && nread && rpb->lticks != ticks) {
650 		vfs_timestamp(&rpb->atime);
651 		rpb->lticks = ticks;
652 	}
653 
654 	/*
655 	 * If we drained the FIFO more then half way then handle
656 	 * write blocking hysteresis.
657 	 *
658 	 * Note that PIPE_WANTW cannot be set by the writer without
659 	 * it holding both rlock and wlock, so we can test it
660 	 * while holding just rlock.
661 	 */
662 	if (notify_writer) {
663 		/*
664 		 * Synchronous blocking is done on the pipe involved
665 		 */
666 		pipesignal(rpb, PIPE_WANTW);
667 
668 		/*
669 		 * But we may also have to deal with a kqueue which is
670 		 * stored on the same pipe as its descriptor, so a
671 		 * EVFILT_WRITE event waiting for our side to drain will
672 		 * be on the other side.
673 		 */
674 		pipewakeup(wpb, 0);
675 	}
676 	/*size = rpb->windex - rpb->rindex;*/
677 	lwkt_reltoken(&rpb->rlock);
678 
679 	return (error);
680 }
681 
682 static int
683 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
684 {
685 	struct pipebuf *rpb;
686 	struct pipebuf *wpb;
687 	struct pipe *pipe;
688 	size_t windex;
689 	size_t space;
690 	size_t wcount;
691 	size_t orig_resid;
692 	int bigwrite;
693 	int bigcount;
694 	int error;
695 	int nbio;
696 
697 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
698 	if ((intptr_t)fp->f_data & 1) {
699 		rpb = &pipe->bufferB;
700 		wpb = &pipe->bufferA;
701 	} else {
702 		rpb = &pipe->bufferA;
703 		wpb = &pipe->bufferB;
704 	}
705 
706 	/*
707 	 * Calculate nbio
708 	 */
709 	if (fflags & O_FBLOCKING)
710 		nbio = 0;
711 	else if (fflags & O_FNONBLOCKING)
712 		nbio = 1;
713 	else if (fp->f_flag & O_NONBLOCK)
714 		nbio = 1;
715 	else
716 		nbio = 0;
717 
718 	/*
719 	 * 'quick' NBIO test before things get expensive.
720 	 */
721 	if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
722 	    uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
723 		return EAGAIN;
724 	}
725 
726 	/*
727 	 * Writes go to the peer.  The peer will always exist.
728 	 */
729 	lwkt_gettoken(&wpb->wlock);
730 	if (wpb->state & PIPE_WEOF) {
731 		lwkt_reltoken(&wpb->wlock);
732 		return (EPIPE);
733 	}
734 
735 	/*
736 	 * Degenerate case (EPIPE takes prec)
737 	 */
738 	if (uio->uio_resid == 0) {
739 		lwkt_reltoken(&wpb->wlock);
740 		return(0);
741 	}
742 
743 	/*
744 	 * Writes are serialized (start_uio must be called with wlock)
745 	 */
746 	error = pipe_start_uio(&wpb->wip);
747 	if (error) {
748 		lwkt_reltoken(&wpb->wlock);
749 		return (error);
750 	}
751 
752 	orig_resid = uio->uio_resid;
753 	wcount = 0;
754 
755 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
756 	bigcount = 10;
757 
758 	while (uio->uio_resid) {
759 		if (wpb->state & PIPE_WEOF) {
760 			error = EPIPE;
761 			break;
762 		}
763 
764 		/*
765 		 * Don't hog the cpu.
766 		 */
767 		if (bigwrite && --bigcount == 0) {
768 			lwkt_user_yield();
769 			bigcount = 10;
770 			if (CURSIG(curthread->td_lwp)) {
771 				error = EINTR;
772 				break;
773 			}
774 		}
775 
776 		windex = wpb->windex & (wpb->size - 1);
777 		space = wpb->size - (wpb->windex - wpb->rindex);
778 
779 		/*
780 		 * Writes of size <= PIPE_BUF must be atomic.
781 		 */
782 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
783 			space = 0;
784 
785 		/*
786 		 * Write to fill, read size handles write hysteresis.  Also
787 		 * additional restrictions can cause select-based non-blocking
788 		 * writes to spin.
789 		 */
790 		if (space > 0) {
791 			size_t segsize;
792 
793 			/*
794 			 * We want to notify a potentially waiting reader
795 			 * before we exhaust the write buffer for SMP
796 			 * pipelining.  Otherwise the write/read will begin
797 			 * to ping-pong.
798 			 */
799 			space = szmin(space, uio->uio_resid);
800 			if (space > (wpb->size >> 1))
801 				space = (wpb->size >> 1);
802 
803 			/*
804 			 * First segment to transfer is minimum of
805 			 * transfer size and contiguous space in
806 			 * pipe buffer.  If first segment to transfer
807 			 * is less than the transfer size, we've got
808 			 * a wraparound in the buffer.
809 			 */
810 			segsize = wpb->size - windex;
811 			if (segsize > space)
812 				segsize = space;
813 
814 			/*
815 			 * If this is the first loop and the reader is
816 			 * blocked, do a preemptive wakeup of the reader.
817 			 *
818 			 * On SMP the IPI latency plus the wlock interlock
819 			 * on the reader side is the fastest way to get the
820 			 * reader going.  (The scheduler will hard loop on
821 			 * lock tokens).
822 			 */
823 			if (wcount == 0)
824 				pipesignal(wpb, PIPE_WANTR);
825 
826 			/*
827 			 * Transfer segment, which may include a wrap-around.
828 			 * Update windex to account for both all in one go
829 			 * so the reader can read() the data atomically.
830 			 */
831 			error = uiomove(&wpb->buffer[windex], segsize, uio);
832 			if (error == 0 && segsize < space) {
833 				segsize = space - segsize;
834 				error = uiomove(&wpb->buffer[0], segsize, uio);
835 			}
836 			if (error)
837 				break;
838 
839 			/*
840 			 * Memory fence prior to windex updating (note: not
841 			 * needed so this is a NOP on Intel).
842 			 */
843 			cpu_sfence();
844 			wpb->windex += space;
845 
846 			/*
847 			 * Signal reader
848 			 */
849 			if (wcount != 0)
850 				pipesignal(wpb, PIPE_WANTR);
851 			wcount += space;
852 			continue;
853 		}
854 
855 		/*
856 		 * Wakeup any pending reader
857 		 */
858 		pipesignal(wpb, PIPE_WANTR);
859 
860 		/*
861 		 * don't block on non-blocking I/O
862 		 */
863 		if (nbio) {
864 			error = EAGAIN;
865 			break;
866 		}
867 
868 #ifdef _RDTSC_SUPPORTED_
869 		if (pipe_delay) {
870 			int64_t tsc_target;
871 			int good = 0;
872 
873 			tsc_target = tsc_get_target(pipe_delay);
874 			while (tsc_test_target(tsc_target) == 0) {
875 				cpu_lfence();
876 				space = wpb->size - (wpb->windex - wpb->rindex);
877 				if ((space < uio->uio_resid) &&
878 				    (orig_resid <= PIPE_BUF)) {
879 					space = 0;
880 				}
881 				if (space) {
882 					good = 1;
883 					break;
884 				}
885 				cpu_pause();
886 			}
887 			if (good)
888 				continue;
889 		}
890 #endif
891 
892 		/*
893 		 * Interlocked test.   Atomic op enforces the memory barrier.
894 		 */
895 		tsleep_interlock(wpb, PCATCH);
896 		atomic_set_int(&wpb->state, PIPE_WANTW);
897 
898 		/*
899 		 * Retest space available after memory barrier above.
900 		 * Writes of size <= PIPE_BUF must be atomic.
901 		 */
902 		space = wpb->size - (wpb->windex - wpb->rindex);
903 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
904 			space = 0;
905 
906 		/*
907 		 * Retest EOF after memory barrier above.
908 		 */
909 		if (wpb->state & PIPE_WEOF) {
910 			error = EPIPE;
911 			break;
912 		}
913 
914 		/*
915 		 * We have no more space and have something to offer,
916 		 * wake up select/poll/kq.
917 		 */
918 		if (space == 0) {
919 			pipewakeup(wpb, 1);
920 			error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
921 		}
922 
923 		/*
924 		 * Break out if we errored or the read side wants us to go
925 		 * away.
926 		 */
927 		if (error)
928 			break;
929 		if (wpb->state & PIPE_WEOF) {
930 			error = EPIPE;
931 			break;
932 		}
933 	}
934 	pipe_end_uio(&wpb->wip);
935 
936 	/*
937 	 * If we have put any characters in the buffer, we wake up
938 	 * the reader.
939 	 *
940 	 * Both rlock and wlock are required to be able to modify pipe_state.
941 	 */
942 	if (wpb->windex != wpb->rindex) {
943 		pipesignal(wpb, PIPE_WANTR);
944 		pipewakeup(wpb, 1);
945 	}
946 
947 	/*
948 	 * Don't return EPIPE if I/O was successful
949 	 */
950 	if ((wpb->rindex == wpb->windex) &&
951 	    (uio->uio_resid == 0) &&
952 	    (error == EPIPE)) {
953 		error = 0;
954 	}
955 
956 	if (error == 0 && wpb->lticks != ticks) {
957 		vfs_timestamp(&wpb->mtime);
958 		wpb->lticks = ticks;
959 	}
960 
961 	/*
962 	 * We have something to offer,
963 	 * wake up select/poll/kq.
964 	 */
965 	/*space = wpb->windex - wpb->rindex;*/
966 	lwkt_reltoken(&wpb->wlock);
967 
968 	return (error);
969 }
970 
971 /*
972  * we implement a very minimal set of ioctls for compatibility with sockets.
973  */
974 static int
975 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
976 	   struct ucred *cred, struct sysmsg *msg)
977 {
978 	struct pipebuf *rpb;
979 	struct pipe *pipe;
980 	int error;
981 
982 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
983 	if ((intptr_t)fp->f_data & 1) {
984 		rpb = &pipe->bufferB;
985 	} else {
986 		rpb = &pipe->bufferA;
987 	}
988 
989 	lwkt_gettoken(&rpb->rlock);
990 	lwkt_gettoken(&rpb->wlock);
991 
992 	switch (cmd) {
993 	case FIOASYNC:
994 		if (*(int *)data) {
995 			atomic_set_int(&rpb->state, PIPE_ASYNC);
996 		} else {
997 			atomic_clear_int(&rpb->state, PIPE_ASYNC);
998 		}
999 		error = 0;
1000 		break;
1001 	case FIONREAD:
1002 		*(int *)data = (int)(rpb->windex - rpb->rindex);
1003 		error = 0;
1004 		break;
1005 	case FIOSETOWN:
1006 		error = fsetown(*(int *)data, &rpb->sigio);
1007 		break;
1008 	case FIOGETOWN:
1009 		*(int *)data = fgetown(&rpb->sigio);
1010 		error = 0;
1011 		break;
1012 	case TIOCSPGRP:
1013 		/* This is deprecated, FIOSETOWN should be used instead. */
1014 		error = fsetown(-(*(int *)data), &rpb->sigio);
1015 		break;
1016 
1017 	case TIOCGPGRP:
1018 		/* This is deprecated, FIOGETOWN should be used instead. */
1019 		*(int *)data = -fgetown(&rpb->sigio);
1020 		error = 0;
1021 		break;
1022 	default:
1023 		error = ENOTTY;
1024 		break;
1025 	}
1026 	lwkt_reltoken(&rpb->wlock);
1027 	lwkt_reltoken(&rpb->rlock);
1028 
1029 	return (error);
1030 }
1031 
1032 /*
1033  * MPSAFE
1034  */
1035 static int
1036 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1037 {
1038 	struct pipebuf *rpb;
1039 	struct pipe *pipe;
1040 
1041 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1042 	if ((intptr_t)fp->f_data & 1) {
1043 		rpb = &pipe->bufferB;
1044 	} else {
1045 		rpb = &pipe->bufferA;
1046 	}
1047 
1048 	bzero((caddr_t)ub, sizeof(*ub));
1049 	ub->st_mode = S_IFIFO;
1050 	ub->st_blksize = rpb->size;
1051 	ub->st_size = rpb->windex - rpb->rindex;
1052 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1053 	ub->st_atimespec = rpb->atime;
1054 	ub->st_mtimespec = rpb->mtime;
1055 	ub->st_ctimespec = pipe->ctime;
1056 	ub->st_uid = fp->f_cred->cr_uid;
1057 	ub->st_gid = fp->f_cred->cr_gid;
1058 	ub->st_ino = pipe->inum;
1059 	/*
1060 	 * Left as 0: st_dev, st_nlink, st_rdev,
1061 	 * st_flags, st_gen.
1062 	 * XXX (st_dev, st_ino) should be unique.
1063 	 */
1064 
1065 	return (0);
1066 }
1067 
1068 static int
1069 pipe_close(struct file *fp)
1070 {
1071 	struct pipebuf *rpb;
1072 	struct pipebuf *wpb;
1073 	struct pipe *pipe;
1074 
1075 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1076 	if ((intptr_t)fp->f_data & 1) {
1077 		rpb = &pipe->bufferB;
1078 		wpb = &pipe->bufferA;
1079 	} else {
1080 		rpb = &pipe->bufferA;
1081 		wpb = &pipe->bufferB;
1082 	}
1083 
1084 	fp->f_ops = &badfileops;
1085 	fp->f_data = NULL;
1086 	funsetown(&rpb->sigio);
1087 	pipeclose(pipe, rpb, wpb);
1088 
1089 	return (0);
1090 }
1091 
1092 /*
1093  * Shutdown one or both directions of a full-duplex pipe.
1094  */
1095 static int
1096 pipe_shutdown(struct file *fp, int how)
1097 {
1098 	struct pipebuf *rpb;
1099 	struct pipebuf *wpb;
1100 	struct pipe *pipe;
1101 	int error = EPIPE;
1102 
1103 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1104 	if ((intptr_t)fp->f_data & 1) {
1105 		rpb = &pipe->bufferB;
1106 		wpb = &pipe->bufferA;
1107 	} else {
1108 		rpb = &pipe->bufferA;
1109 		wpb = &pipe->bufferB;
1110 	}
1111 
1112 	/*
1113 	 * We modify pipe_state on both pipes, which means we need
1114 	 * all four tokens!
1115 	 */
1116 	lwkt_gettoken(&rpb->rlock);
1117 	lwkt_gettoken(&rpb->wlock);
1118 	lwkt_gettoken(&wpb->rlock);
1119 	lwkt_gettoken(&wpb->wlock);
1120 
1121 	switch(how) {
1122 	case SHUT_RDWR:
1123 	case SHUT_RD:
1124 		/*
1125 		 * EOF on my reads and peer writes
1126 		 */
1127 		atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1128 		if (rpb->state & PIPE_WANTR) {
1129 			rpb->state &= ~PIPE_WANTR;
1130 			wakeup(rpb);
1131 		}
1132 		if (rpb->state & PIPE_WANTW) {
1133 			rpb->state &= ~PIPE_WANTW;
1134 			wakeup(rpb);
1135 		}
1136 		error = 0;
1137 		if (how == SHUT_RD)
1138 			break;
1139 		/* fall through */
1140 	case SHUT_WR:
1141 		/*
1142 		 * EOF on peer reads and my writes
1143 		 */
1144 		atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1145 		if (wpb->state & PIPE_WANTR) {
1146 			wpb->state &= ~PIPE_WANTR;
1147 			wakeup(wpb);
1148 		}
1149 		if (wpb->state & PIPE_WANTW) {
1150 			wpb->state &= ~PIPE_WANTW;
1151 			wakeup(wpb);
1152 		}
1153 		error = 0;
1154 		break;
1155 	}
1156 	pipewakeup(rpb, 1);
1157 	pipewakeup(wpb, 1);
1158 
1159 	lwkt_reltoken(&wpb->wlock);
1160 	lwkt_reltoken(&wpb->rlock);
1161 	lwkt_reltoken(&rpb->wlock);
1162 	lwkt_reltoken(&rpb->rlock);
1163 
1164 	return (error);
1165 }
1166 
1167 /*
1168  * Destroy the pipe buffer.
1169  */
1170 static void
1171 pipe_free_kmem(struct pipebuf *pb)
1172 {
1173 	if (pb->buffer != NULL) {
1174 		kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1175 		pb->buffer = NULL;
1176 		pb->object = NULL;
1177 	}
1178 }
1179 
1180 /*
1181  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1182  * and writing on wpb.  This routine must be called twice with the pipebufs
1183  * reversed to close both directions.
1184  */
1185 static void
1186 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1187 {
1188 	globaldata_t gd;
1189 
1190 	if (pipe == NULL)
1191 		return;
1192 
1193 	/*
1194 	 * We need both the read and write tokens to modify pipe_state.
1195 	 */
1196 	lwkt_gettoken(&rpb->rlock);
1197 	lwkt_gettoken(&rpb->wlock);
1198 
1199 	/*
1200 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1201 	 * wakeup anyone blocked on our pipe.  No action if our side
1202 	 * is already closed.
1203 	 */
1204 	if (rpb->state & PIPE_CLOSED) {
1205 		lwkt_reltoken(&rpb->wlock);
1206 		lwkt_reltoken(&rpb->rlock);
1207 		return;
1208 	}
1209 
1210 	atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1211 	pipewakeup(rpb, 1);
1212 	if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1213 		rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1214 		wakeup(rpb);
1215 	}
1216 	lwkt_reltoken(&rpb->wlock);
1217 	lwkt_reltoken(&rpb->rlock);
1218 
1219 	/*
1220 	 * Disconnect from peer.
1221 	 */
1222 	lwkt_gettoken(&wpb->rlock);
1223 	lwkt_gettoken(&wpb->wlock);
1224 
1225 	atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1226 	pipewakeup(wpb, 1);
1227 	if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1228 		wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1229 		wakeup(wpb);
1230 	}
1231 	if (SLIST_FIRST(&wpb->kq.ki_note))
1232 		KNOTE(&wpb->kq.ki_note, 0);
1233 	lwkt_reltoken(&wpb->wlock);
1234 	lwkt_reltoken(&wpb->rlock);
1235 
1236 	/*
1237 	 * Free resources once both sides are closed.  We maintain a pcpu
1238 	 * cache to improve performance, so the actual tear-down case is
1239 	 * limited to bulk situations.
1240 	 *
1241 	 * However, the bulk tear-down case can cause intense contention
1242 	 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1243 	 * of processes are killed at the same time.  To deal with this we
1244 	 * use a pcpu mutex to maintain concurrency but also limit the
1245 	 * number of threads banging on the map and pmap.
1246 	 *
1247 	 * We use the mtx mechanism instead of the lockmgr mechanism because
1248 	 * the mtx mechanism utilizes a queued design which will not break
1249 	 * down in the face of thousands to hundreds of thousands of
1250 	 * processes trying to free pipes simultaneously.  The lockmgr
1251 	 * mechanism will wind up waking them all up each time a lock
1252 	 * cycles.
1253 	 */
1254 	if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1255 		gd = mycpu;
1256 		if (gd->gd_pipeqcount >= pipe_maxcache) {
1257 			mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1258 			pipe_free_kmem(rpb);
1259 			pipe_free_kmem(wpb);
1260 			mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1261 			kfree(pipe, M_PIPE);
1262 		} else {
1263 			rpb->state = 0;
1264 			wpb->state = 0;
1265 			pipe->next = gd->gd_pipeq;
1266 			gd->gd_pipeq = pipe;
1267 			++gd->gd_pipeqcount;
1268 		}
1269 	}
1270 }
1271 
1272 static int
1273 pipe_kqfilter(struct file *fp, struct knote *kn)
1274 {
1275 	struct pipebuf *rpb;
1276 	struct pipebuf *wpb;
1277 	struct pipe *pipe;
1278 
1279 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1280 	if ((intptr_t)fp->f_data & 1) {
1281 		rpb = &pipe->bufferB;
1282 		wpb = &pipe->bufferA;
1283 	} else {
1284 		rpb = &pipe->bufferA;
1285 		wpb = &pipe->bufferB;
1286 	}
1287 
1288 	switch (kn->kn_filter) {
1289 	case EVFILT_READ:
1290 		kn->kn_fop = &pipe_rfiltops;
1291 		break;
1292 	case EVFILT_WRITE:
1293 		kn->kn_fop = &pipe_wfiltops;
1294 		if (wpb->state & PIPE_CLOSED) {
1295 			/* other end of pipe has been closed */
1296 			return (EPIPE);
1297 		}
1298 		break;
1299 	default:
1300 		return (EOPNOTSUPP);
1301 	}
1302 
1303 	if (rpb == &pipe->bufferA)
1304 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1305 	else
1306 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1307 
1308 	knote_insert(&rpb->kq.ki_note, kn);
1309 
1310 	return (0);
1311 }
1312 
1313 static void
1314 filt_pipedetach(struct knote *kn)
1315 {
1316 	struct pipebuf *rpb;
1317 	struct pipebuf *wpb;
1318 	struct pipe *pipe;
1319 
1320 	pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1321 	if ((intptr_t)kn->kn_hook & 1) {
1322 		rpb = &pipe->bufferB;
1323 		wpb = &pipe->bufferA;
1324 	} else {
1325 		rpb = &pipe->bufferA;
1326 		wpb = &pipe->bufferB;
1327 	}
1328 	knote_remove(&rpb->kq.ki_note, kn);
1329 }
1330 
1331 /*ARGSUSED*/
1332 static int
1333 filt_piperead(struct knote *kn, long hint)
1334 {
1335 	struct pipebuf *rpb;
1336 	struct pipebuf *wpb;
1337 	struct pipe *pipe;
1338 	int ready = 0;
1339 
1340 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1341 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1342 		rpb = &pipe->bufferB;
1343 		wpb = &pipe->bufferA;
1344 	} else {
1345 		rpb = &pipe->bufferA;
1346 		wpb = &pipe->bufferB;
1347 	}
1348 
1349 	/*
1350 	 * We shouldn't need the pipe locks because the knote itself is
1351 	 * locked via KN_PROCESSING.  If we lose a race against the writer,
1352 	 * the writer will just issue a KNOTE() after us.
1353 	 */
1354 #if 0
1355 	lwkt_gettoken(&rpb->rlock);
1356 	lwkt_gettoken(&rpb->wlock);
1357 #endif
1358 
1359 	kn->kn_data = rpb->windex - rpb->rindex;
1360 	if (kn->kn_data < 0)
1361 		kn->kn_data = 0;
1362 
1363 	if (rpb->state & PIPE_REOF) {
1364 		/*
1365 		 * Only set NODATA if all data has been exhausted
1366 		 */
1367 		if (kn->kn_data == 0)
1368 			kn->kn_flags |= EV_NODATA;
1369 		kn->kn_flags |= EV_EOF;
1370 		ready = 1;
1371 	}
1372 
1373 #if 0
1374 	lwkt_reltoken(&rpb->wlock);
1375 	lwkt_reltoken(&rpb->rlock);
1376 #endif
1377 
1378 	if (!ready)
1379 		ready = kn->kn_data > 0;
1380 
1381 	return (ready);
1382 }
1383 
1384 /*ARGSUSED*/
1385 static int
1386 filt_pipewrite(struct knote *kn, long hint)
1387 {
1388 	struct pipebuf *rpb;
1389 	struct pipebuf *wpb;
1390 	struct pipe *pipe;
1391 	int ready = 0;
1392 
1393 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1394 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1395 		rpb = &pipe->bufferB;
1396 		wpb = &pipe->bufferA;
1397 	} else {
1398 		rpb = &pipe->bufferA;
1399 		wpb = &pipe->bufferB;
1400 	}
1401 
1402 	kn->kn_data = 0;
1403 	if (wpb->state & PIPE_CLOSED) {
1404 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1405 		return (1);
1406 	}
1407 
1408 	/*
1409 	 * We shouldn't need the pipe locks because the knote itself is
1410 	 * locked via KN_PROCESSING.  If we lose a race against the reader,
1411 	 * the writer will just issue a KNOTE() after us.
1412 	 */
1413 #if 0
1414 	lwkt_gettoken(&wpb->rlock);
1415 	lwkt_gettoken(&wpb->wlock);
1416 #endif
1417 
1418 	if (wpb->state & PIPE_WEOF) {
1419 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1420 		ready = 1;
1421 	}
1422 
1423 	if (!ready) {
1424 		kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1425 		if (kn->kn_data < 0)
1426 			kn->kn_data = 0;
1427 	}
1428 
1429 #if 0
1430 	lwkt_reltoken(&wpb->wlock);
1431 	lwkt_reltoken(&wpb->rlock);
1432 #endif
1433 
1434 	if (!ready)
1435 		ready = kn->kn_data >= PIPE_BUF;
1436 
1437 	return (ready);
1438 }
1439