xref: /dragonfly/sys/kern/vfs_journal.c (revision f2c43266)
1 /*
2  * Copyright (c) 2004-2006 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.33 2007/05/09 00:53:34 dillon Exp $
35  */
36 /*
37  * The journaling protocol is intended to evolve into a two-way stream
38  * whereby transaction IDs can be acknowledged by the journaling target
39  * when the data has been committed to hard storage.  Both implicit and
40  * explicit acknowledgement schemes will be supported, depending on the
41  * sophistication of the journaling stream, plus resynchronization and
42  * restart when a journaling stream is interrupted.  This information will
43  * also be made available to journaling-aware filesystems to allow better
44  * management of their own physical storage synchronization mechanisms as
45  * well as to allow such filesystems to take direct advantage of the kernel's
46  * journaling layer so they don't have to roll their own.
47  *
48  * In addition, the worker thread will have access to much larger
49  * spooling areas then the memory buffer is able to provide by e.g.
50  * reserving swap space, in order to absorb potentially long interruptions
51  * of off-site journaling streams, and to prevent 'slow' off-site linkages
52  * from radically slowing down local filesystem operations.
53  *
54  * Because of the non-trivial algorithms the journaling system will be
55  * required to support, use of a worker thread is mandatory.  Efficiencies
56  * are maintained by utilitizing the memory FIFO to batch transactions when
57  * possible, reducing the number of gratuitous thread switches and taking
58  * advantage of cpu caches through the use of shorter batched code paths
59  * rather then trying to do everything in the context of the process
60  * originating the filesystem op.  In the future the memory FIFO can be
61  * made per-cpu to remove BGL or other locking requirements.
62  */
63 #include <sys/param.h>
64 #include <sys/systm.h>
65 #include <sys/buf.h>
66 #include <sys/conf.h>
67 #include <sys/kernel.h>
68 #include <sys/queue.h>
69 #include <sys/lock.h>
70 #include <sys/malloc.h>
71 #include <sys/mount.h>
72 #include <sys/unistd.h>
73 #include <sys/vnode.h>
74 #include <sys/poll.h>
75 #include <sys/mountctl.h>
76 #include <sys/journal.h>
77 #include <sys/file.h>
78 #include <sys/proc.h>
79 #include <sys/xio.h>
80 #include <sys/socket.h>
81 #include <sys/socketvar.h>
82 
83 #include <machine/limits.h>
84 
85 #include <vm/vm.h>
86 #include <vm/vm_object.h>
87 #include <vm/vm_page.h>
88 #include <vm/vm_pager.h>
89 #include <vm/vnode_pager.h>
90 
91 #include <sys/file2.h>
92 #include <sys/thread2.h>
93 #include <sys/mplock2.h>
94 #include <sys/spinlock2.h>
95 
96 static void journal_wthread(void *info);
97 static void journal_rthread(void *info);
98 
99 static void *journal_reserve(struct journal *jo,
100                         struct journal_rawrecbeg **rawpp,
101                         int16_t streamid, int bytes);
102 static void *journal_extend(struct journal *jo,
103                         struct journal_rawrecbeg **rawpp,
104                         int truncbytes, int bytes, int *newstreamrecp);
105 static void journal_abort(struct journal *jo,
106                         struct journal_rawrecbeg **rawpp);
107 static void journal_commit(struct journal *jo,
108                         struct journal_rawrecbeg **rawpp,
109                         int bytes, int closeout);
110 static void jrecord_data(struct jrecord *jrec,
111 			void *buf, int bytes, int dtype);
112 
113 
114 MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
115 MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
116 
117 void
118 journal_create_threads(struct journal *jo)
119 {
120 	jo->flags &= ~(MC_JOURNAL_STOP_REQ | MC_JOURNAL_STOP_IMM);
121 	jo->flags |= MC_JOURNAL_WACTIVE;
122 	lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
123 		    TDF_NOSTART, -1,
124 		    "journal w:%.*s", JIDMAX, jo->id);
125 	lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
126 	lwkt_schedule(&jo->wthread);
127 
128 	if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
129 	    jo->flags |= MC_JOURNAL_RACTIVE;
130 	    lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
131 			TDF_NOSTART, -1,
132 			"journal r:%.*s", JIDMAX, jo->id);
133 	    lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
134 	    lwkt_schedule(&jo->rthread);
135 	}
136 }
137 
138 void
139 journal_destroy_threads(struct journal *jo, int flags)
140 {
141     int wcount;
142 
143     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
144     wakeup(&jo->fifo);
145     wcount = 0;
146     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
147 	tsleep(jo, 0, "jwait", hz);
148 	if (++wcount % 10 == 0) {
149 	    kprintf("Warning: journal %s waiting for descriptors to close\n",
150 		jo->id);
151 	}
152     }
153 
154     /*
155      * XXX SMP - threads should move to cpu requesting the restart or
156      * termination before finishing up to properly interlock.
157      */
158     tsleep(jo, 0, "jwait", hz);
159     lwkt_free_thread(&jo->wthread);
160     if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
161 	lwkt_free_thread(&jo->rthread);
162 }
163 
164 /*
165  * The per-journal worker thread is responsible for writing out the
166  * journal's FIFO to the target stream.
167  */
168 static void
169 journal_wthread(void *info)
170 {
171     struct journal *jo = info;
172     struct journal_rawrecbeg *rawp;
173     int error;
174     size_t avail;
175     size_t bytes;
176     size_t res;
177 
178     /* not MPSAFE yet */
179     get_mplock();
180 
181     for (;;) {
182 	/*
183 	 * Calculate the number of bytes available to write.  This buffer
184 	 * area may contain reserved records so we can't just write it out
185 	 * without further checks.
186 	 */
187 	bytes = jo->fifo.windex - jo->fifo.rindex;
188 
189 	/*
190 	 * sleep if no bytes are available or if an incomplete record is
191 	 * encountered (it needs to be filled in before we can write it
192 	 * out), and skip any pad records that we encounter.
193 	 */
194 	if (bytes == 0) {
195 	    if (jo->flags & MC_JOURNAL_STOP_REQ)
196 		break;
197 	    tsleep(&jo->fifo, 0, "jfifo", hz);
198 	    continue;
199 	}
200 
201 	/*
202 	 * Sleep if we can not go any further due to hitting an incomplete
203 	 * record.  This case should occur rarely but may have to be better
204 	 * optimized XXX.
205 	 */
206 	rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
207 	if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
208 	    tsleep(&jo->fifo, 0, "jpad", hz);
209 	    continue;
210 	}
211 
212 	/*
213 	 * Skip any pad records.  We do not write out pad records if we can
214 	 * help it.
215 	 */
216 	if (rawp->streamid == JREC_STREAMID_PAD) {
217 	    if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
218 		if (jo->fifo.rindex == jo->fifo.xindex) {
219 		    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
220 		    jo->total_acked += (rawp->recsize + 15) & ~15;
221 		}
222 	    }
223 	    jo->fifo.rindex += (rawp->recsize + 15) & ~15;
224 	    jo->total_acked += bytes;
225 	    KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
226 	    continue;
227 	}
228 
229 	/*
230 	 * 'bytes' is the amount of data that can potentially be written out.
231 	 * Calculate 'res', the amount of data that can actually be written
232 	 * out.  res is bounded either by hitting the end of the physical
233 	 * memory buffer or by hitting an incomplete record.  Incomplete
234 	 * records often occur due to the way the space reservation model
235 	 * works.
236 	 */
237 	res = 0;
238 	avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
239 	while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
240 	    res += (rawp->recsize + 15) & ~15;
241 	    if (res >= avail) {
242 		KKASSERT(res == avail);
243 		break;
244 	    }
245 	    rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
246 	}
247 
248 	/*
249 	 * Issue the write and deal with any errors or other conditions.
250 	 * For now assume blocking I/O.  Since we are record-aware the
251 	 * code cannot yet handle partial writes.
252 	 *
253 	 * We bump rindex prior to issuing the write to avoid racing
254 	 * the acknowledgement coming back (which could prevent the ack
255 	 * from bumping xindex).  Restarts are always based on xindex so
256 	 * we do not try to undo the rindex if an error occurs.
257 	 *
258 	 * XXX EWOULDBLOCK/NBIO
259 	 * XXX notification on failure
260 	 * XXX permanent verses temporary failures
261 	 * XXX two-way acknowledgement stream in the return direction / xindex
262 	 */
263 	bytes = res;
264 	jo->fifo.rindex += bytes;
265 	error = fp_write(jo->fp,
266 			jo->fifo.membase +
267 			 ((jo->fifo.rindex - bytes) & jo->fifo.mask),
268 			bytes, &res, UIO_SYSSPACE);
269 	if (error) {
270 	    kprintf("journal_thread(%s) write, error %d\n", jo->id, error);
271 	    /* XXX */
272 	} else {
273 	    KKASSERT(res == bytes);
274 	}
275 
276 	/*
277 	 * Advance rindex.  If the journal stream is not full duplex we also
278 	 * advance xindex, otherwise the rjournal thread is responsible for
279 	 * advancing xindex.
280 	 */
281 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
282 	    jo->fifo.xindex += bytes;
283 	    jo->total_acked += bytes;
284 	}
285 	KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
286 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
287 	    if (jo->flags & MC_JOURNAL_WWAIT) {
288 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
289 		wakeup(&jo->fifo.windex);
290 	    }
291 	}
292     }
293     fp_shutdown(jo->fp, SHUT_WR);
294     jo->flags &= ~MC_JOURNAL_WACTIVE;
295     wakeup(jo);
296     wakeup(&jo->fifo.windex);
297     rel_mplock();
298 }
299 
300 /*
301  * A second per-journal worker thread is created for two-way journaling
302  * streams to deal with the return acknowledgement stream.
303  */
304 static void
305 journal_rthread(void *info)
306 {
307     struct journal_rawrecbeg *rawp;
308     struct journal_ackrecord ack;
309     struct journal *jo = info;
310     int64_t transid;
311     int error;
312     size_t count;
313     size_t bytes;
314 
315     transid = 0;
316     error = 0;
317 
318     /* not MPSAFE yet */
319     get_mplock();
320 
321     for (;;) {
322 	/*
323 	 * We have been asked to stop
324 	 */
325 	if (jo->flags & MC_JOURNAL_STOP_REQ)
326 		break;
327 
328 	/*
329 	 * If we have no active transaction id, get one from the return
330 	 * stream.
331 	 */
332 	if (transid == 0) {
333 	    error = fp_read(jo->fp, &ack, sizeof(ack), &count,
334 			    1, UIO_SYSSPACE);
335 #if 0
336 	    kprintf("fp_read ack error %d count %d\n", error, count);
337 #endif
338 	    if (error || count != sizeof(ack))
339 		break;
340 	    if (error) {
341 		kprintf("read error %d on receive stream\n", error);
342 		break;
343 	    }
344 	    if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
345 		ack.rend.endmagic != JREC_ENDMAGIC
346 	    ) {
347 		kprintf("bad begmagic or endmagic on receive stream\n");
348 		break;
349 	    }
350 	    transid = ack.rbeg.transid;
351 	}
352 
353 	/*
354 	 * Calculate the number of unacknowledged bytes.  If there are no
355 	 * unacknowledged bytes then unsent data was acknowledged, report,
356 	 * sleep a bit, and loop in that case.  This should not happen
357 	 * normally.  The ack record is thrown away.
358 	 */
359 	bytes = jo->fifo.rindex - jo->fifo.xindex;
360 
361 	if (bytes == 0) {
362 	    kprintf("warning: unsent data acknowledged transid %08llx\n",
363 		    (long long)transid);
364 	    tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
365 	    transid = 0;
366 	    continue;
367 	}
368 
369 	/*
370 	 * Since rindex has advanced, the record pointed to by xindex
371 	 * must be a valid record.
372 	 */
373 	rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
374 	KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
375 	KKASSERT(rawp->recsize <= bytes);
376 
377 	/*
378 	 * The target can acknowledge several records at once.
379 	 */
380 	if (rawp->transid < transid) {
381 #if 1
382 	    kprintf("ackskip %08llx/%08llx\n",
383 		    (long long)rawp->transid,
384 		    (long long)transid);
385 #endif
386 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
387 	    jo->total_acked += (rawp->recsize + 15) & ~15;
388 	    if (jo->flags & MC_JOURNAL_WWAIT) {
389 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
390 		wakeup(&jo->fifo.windex);
391 	    }
392 	    continue;
393 	}
394 	if (rawp->transid == transid) {
395 #if 1
396 	    kprintf("ackskip %08llx/%08llx\n",
397 		    (long long)rawp->transid,
398 		    (long long)transid);
399 #endif
400 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
401 	    jo->total_acked += (rawp->recsize + 15) & ~15;
402 	    if (jo->flags & MC_JOURNAL_WWAIT) {
403 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
404 		wakeup(&jo->fifo.windex);
405 	    }
406 	    transid = 0;
407 	    continue;
408 	}
409 	kprintf("warning: unsent data(2) acknowledged transid %08llx\n",
410 		(long long)transid);
411 	transid = 0;
412     }
413     jo->flags &= ~MC_JOURNAL_RACTIVE;
414     wakeup(jo);
415     wakeup(&jo->fifo.windex);
416     rel_mplock();
417 }
418 
419 /*
420  * This builds a pad record which the journaling thread will skip over.  Pad
421  * records are required when we are unable to reserve sufficient stream space
422  * due to insufficient space at the end of the physical memory fifo.
423  *
424  * Even though the record is not transmitted, a normal transid must be
425  * assigned to it so link recovery operations after a failure work properly.
426  */
427 static
428 void
429 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
430 {
431     struct journal_rawrecend *rendp;
432 
433     KKASSERT((recsize & 15) == 0 && recsize >= 16);
434 
435     rawp->streamid = JREC_STREAMID_PAD;
436     rawp->recsize = recsize;	/* must be 16-byte aligned */
437     rawp->transid = transid;
438     /*
439      * WARNING, rendp may overlap rawp->transid.  This is necessary to
440      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
441      * hopefully cause the compiler to not make any assumptions.
442      */
443     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
444     rendp->endmagic = JREC_ENDMAGIC;
445     rendp->check = 0;
446     rendp->recsize = rawp->recsize;
447 
448     /*
449      * Set the begin magic last.  This is what will allow the journal
450      * thread to write the record out.  Use a store fence to prevent
451      * compiler and cpu reordering of the writes.
452      */
453     cpu_sfence();
454     rawp->begmagic = JREC_BEGMAGIC;
455 }
456 
457 /*
458  * Wake up the worker thread if the FIFO is more then half full or if
459  * someone is waiting for space to be freed up.  Otherwise let the
460  * heartbeat deal with it.  Being able to avoid waking up the worker
461  * is the key to the journal's cpu performance.
462  */
463 static __inline
464 void
465 journal_commit_wakeup(struct journal *jo)
466 {
467     int avail;
468 
469     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
470     KKASSERT(avail >= 0);
471     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
472 	wakeup(&jo->fifo);
473 }
474 
475 /*
476  * Create a new BEGIN stream record with the specified streamid and the
477  * specified amount of payload space.  *rawpp will be set to point to the
478  * base of the new stream record and a pointer to the base of the payload
479  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
480  * making this call.  The raw record header will be partially initialized.
481  *
482  * A stream can be extended, aborted, or committed by other API calls
483  * below.  This may result in a sequence of potentially disconnected
484  * stream records to be output to the journaling target.  The first record
485  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
486  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
487  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
488  * up being the same as the first, in which case the bits are all set in
489  * the first record.
490  *
491  * The stream record is created in an incomplete state by setting the begin
492  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
493  * flushing the fifo past our record until we have finished populating it.
494  * Other threads can reserve and operate on their own space without stalling
495  * but the stream output will stall until we have completed operations.  The
496  * memory FIFO is intended to be large enough to absorb such situations
497  * without stalling out other threads.
498  */
499 static
500 void *
501 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
502 		int16_t streamid, int bytes)
503 {
504     struct journal_rawrecbeg *rawp;
505     int avail;
506     int availtoend;
507     int req;
508 
509     /*
510      * Add header and trailer overheads to the passed payload.  Note that
511      * the passed payload size need not be aligned in any way.
512      */
513     bytes += sizeof(struct journal_rawrecbeg);
514     bytes += sizeof(struct journal_rawrecend);
515 
516     for (;;) {
517 	/*
518 	 * First, check boundary conditions.  If the request would wrap around
519 	 * we have to skip past the ending block and return to the beginning
520 	 * of the FIFO's buffer.  Calculate 'req' which is the actual number
521 	 * of bytes being reserved, including wrap-around dead space.
522 	 *
523 	 * Neither 'bytes' or 'req' are aligned.
524 	 *
525 	 * Note that availtoend is not truncated to avail and so cannot be
526 	 * used to determine whether the reservation is possible by itself.
527 	 * Also, since all fifo ops are 16-byte aligned, we can check
528 	 * the size before calculating the aligned size.
529 	 */
530 	availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
531 	KKASSERT((availtoend & 15) == 0);
532 	if (bytes > availtoend)
533 	    req = bytes + availtoend;	/* add pad to end */
534 	else
535 	    req = bytes;
536 
537 	/*
538 	 * Next calculate the total available space and see if it is
539 	 * sufficient.  We cannot overwrite previously buffered data
540 	 * past xindex because otherwise we would not be able to restart
541 	 * a broken link at the target's last point of commit.
542 	 */
543 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
544 	KKASSERT(avail >= 0 && (avail & 15) == 0);
545 
546 	if (avail < req) {
547 	    /* XXX MC_JOURNAL_STOP_IMM */
548 	    jo->flags |= MC_JOURNAL_WWAIT;
549 	    ++jo->fifostalls;
550 	    tsleep(&jo->fifo.windex, 0, "jwrite", 0);
551 	    continue;
552 	}
553 
554 	/*
555 	 * Create a pad record for any dead space and create an incomplete
556 	 * record for the live space, then return a pointer to the
557 	 * contiguous buffer space that was requested.
558 	 *
559 	 * NOTE: The worker thread will not flush past an incomplete
560 	 * record, so the reserved space can be filled in at-will.  The
561 	 * journaling code must also be aware the reserved sections occuring
562 	 * after this one will also not be written out even if completed
563 	 * until this one is completed.
564 	 *
565 	 * The transaction id must accomodate real and potential pad creation.
566 	 */
567 	rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
568 	if (req != bytes) {
569 	    journal_build_pad(rawp, availtoend, jo->transid);
570 	    ++jo->transid;
571 	    rawp = (void *)jo->fifo.membase;
572 	}
573 	rawp->begmagic = JREC_INCOMPLETEMAGIC;	/* updated by abort/commit */
574 	rawp->recsize = bytes;			/* (unaligned size) */
575 	rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
576 	rawp->transid = jo->transid;
577 	jo->transid += 2;
578 
579 	/*
580 	 * Issue a memory barrier to guarentee that the record data has been
581 	 * properly initialized before we advance the write index and return
582 	 * a pointer to the reserved record.  Otherwise the worker thread
583 	 * could accidently run past us.
584 	 *
585 	 * Note that stream records are always 16-byte aligned.
586 	 */
587 	cpu_sfence();
588 	jo->fifo.windex += (req + 15) & ~15;
589 	*rawpp = rawp;
590 	return(rawp + 1);
591     }
592     /* not reached */
593     *rawpp = NULL;
594     return(NULL);
595 }
596 
597 /*
598  * Attempt to extend the stream record by <bytes> worth of payload space.
599  *
600  * If it is possible to extend the existing stream record no truncation
601  * occurs and the record is extended as specified.  A pointer to the
602  * truncation offset within the payload space is returned.
603  *
604  * If it is not possible to do this the existing stream record is truncated
605  * and committed, and a new stream record of size <bytes> is created.  A
606  * pointer to the base of the new stream record's payload space is returned.
607  *
608  * *rawpp is set to the new reservation in the case of a new record but
609  * the caller cannot depend on a comparison with the old rawp to determine if
610  * this case occurs because we could end up using the same memory FIFO
611  * offset for the new stream record.  Use *newstreamrecp instead.
612  */
613 static void *
614 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp,
615 		int truncbytes, int bytes, int *newstreamrecp)
616 {
617     struct journal_rawrecbeg *rawp;
618     int16_t streamid;
619     int availtoend;
620     int avail;
621     int osize;
622     int nsize;
623     int wbase;
624     void *rptr;
625 
626     *newstreamrecp = 0;
627     rawp = *rawpp;
628     osize = (rawp->recsize + 15) & ~15;
629     nsize = (rawp->recsize + bytes + 15) & ~15;
630     wbase = (char *)rawp - jo->fifo.membase;
631 
632     /*
633      * If the aligned record size does not change we can trivially adjust
634      * the record size.
635      */
636     if (nsize == osize) {
637 	rawp->recsize += bytes;
638 	return((char *)(rawp + 1) + truncbytes);
639     }
640 
641     /*
642      * If the fifo's write index hasn't been modified since we made the
643      * reservation and we do not hit any boundary conditions, we can
644      * trivially make the record smaller or larger.
645      */
646     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
647 	availtoend = jo->fifo.size - wbase;
648 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
649 	KKASSERT((availtoend & 15) == 0);
650 	KKASSERT((avail & 15) == 0);
651 	if (nsize <= avail && nsize <= availtoend) {
652 	    jo->fifo.windex += nsize - osize;
653 	    rawp->recsize += bytes;
654 	    return((char *)(rawp + 1) + truncbytes);
655 	}
656     }
657 
658     /*
659      * It was not possible to extend the buffer.  Commit the current
660      * buffer and create a new one.  We manually clear the BEGIN mark that
661      * journal_reserve() creates (because this is a continuing record, not
662      * the start of a new stream).
663      */
664     streamid = rawp->streamid & JREC_STREAMID_MASK;
665     journal_commit(jo, rawpp, truncbytes, 0);
666     rptr = journal_reserve(jo, rawpp, streamid, bytes);
667     rawp = *rawpp;
668     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
669     *newstreamrecp = 1;
670     return(rptr);
671 }
672 
673 /*
674  * Abort a journal record.  If the transaction record represents a stream
675  * BEGIN and we can reverse the fifo's write index we can simply reverse
676  * index the entire record, as if it were never reserved in the first place.
677  *
678  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
679  * with the payload truncated to 0 bytes.
680  */
681 static void
682 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
683 {
684     struct journal_rawrecbeg *rawp;
685     int osize;
686 
687     rawp = *rawpp;
688     osize = (rawp->recsize + 15) & ~15;
689 
690     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
691 	(jo->fifo.windex & jo->fifo.mask) ==
692 	 (char *)rawp - jo->fifo.membase + osize)
693     {
694 	jo->fifo.windex -= osize;
695 	*rawpp = NULL;
696     } else {
697 	rawp->streamid |= JREC_STREAMCTL_ABORTED;
698 	journal_commit(jo, rawpp, 0, 1);
699     }
700 }
701 
702 /*
703  * Commit a journal record and potentially truncate it to the specified
704  * number of payload bytes.  If you do not want to truncate the record,
705  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
706  * field includes header and trailer and will not be correct.  Note that
707  * passing 0 will truncate the entire data payload of the record.
708  *
709  * The logical stream is terminated by this function.
710  *
711  * If truncation occurs, and it is not possible to physically optimize the
712  * memory FIFO due to other threads having reserved space after ours,
713  * the remaining reserved space will be covered by a pad record.
714  */
715 static void
716 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
717 		int bytes, int closeout)
718 {
719     struct journal_rawrecbeg *rawp;
720     struct journal_rawrecend *rendp;
721     int osize;
722     int nsize;
723 
724     rawp = *rawpp;
725     *rawpp = NULL;
726 
727     KKASSERT((char *)rawp >= jo->fifo.membase &&
728 	     (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
729     KKASSERT(((intptr_t)rawp & 15) == 0);
730 
731     /*
732      * Truncate the record if necessary.  If the FIFO write index as still
733      * at the end of our record we can optimally backindex it.  Otherwise
734      * we have to insert a pad record to cover the dead space.
735      *
736      * We calculate osize which is the 16-byte-aligned original recsize.
737      * We calculate nsize which is the 16-byte-aligned new recsize.
738      *
739      * Due to alignment issues or in case the passed truncation bytes is
740      * the same as the original payload, nsize may be equal to osize even
741      * if the committed bytes is less then the originally reserved bytes.
742      */
743     if (bytes >= 0) {
744 	KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
745 	osize = (rawp->recsize + 15) & ~15;
746 	rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
747 			sizeof(struct journal_rawrecend);
748 	nsize = (rawp->recsize + 15) & ~15;
749 	KKASSERT(nsize <= osize);
750 	if (osize == nsize) {
751 	    /* do nothing */
752 	} else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
753 	    /* we are able to backindex the fifo */
754 	    jo->fifo.windex -= osize - nsize;
755 	} else {
756 	    /* we cannot backindex the fifo, emplace a pad in the dead space */
757 	    journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
758 				rawp->transid + 1);
759 	}
760     }
761 
762     /*
763      * Fill in the trailer.  Note that unlike pad records, the trailer will
764      * never overlap the header.
765      */
766     rendp = (void *)((char *)rawp +
767 	    ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
768     rendp->endmagic = JREC_ENDMAGIC;
769     rendp->recsize = rawp->recsize;
770     rendp->check = 0;		/* XXX check word, disabled for now */
771 
772     /*
773      * Fill in begmagic last.  This will allow the worker thread to proceed.
774      * Use a memory barrier to guarentee write ordering.  Mark the stream
775      * as terminated if closeout is set.  This is the typical case.
776      */
777     if (closeout)
778 	rawp->streamid |= JREC_STREAMCTL_END;
779     cpu_sfence();		/* memory and compiler barrier */
780     rawp->begmagic = JREC_BEGMAGIC;
781 
782     journal_commit_wakeup(jo);
783 }
784 
785 /************************************************************************
786  *			TRANSACTION SUPPORT ROUTINES			*
787  ************************************************************************
788  *
789  * JRECORD_*() - routines to create subrecord transactions and embed them
790  *		 in the logical streams managed by the journal_*() routines.
791  */
792 
793 /*
794  * Initialize the passed jrecord structure and start a new stream transaction
795  * by reserving an initial build space in the journal's memory FIFO.
796  */
797 void
798 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
799 {
800     bzero(jrec, sizeof(*jrec));
801     jrec->jo = jo;
802     jrec->streamid = streamid;
803     jrec->stream_residual = JREC_DEFAULTSIZE;
804     jrec->stream_reserved = jrec->stream_residual;
805     jrec->stream_ptr =
806 	journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
807 }
808 
809 /*
810  * Push a recursive record type.  All pushes should have matching pops.
811  * The old parent is returned and the newly pushed record becomes the
812  * new parent.  Note that the old parent's pointer may already be invalid
813  * or may become invalid if jrecord_write() had to build a new stream
814  * record, so the caller should not mess with the returned pointer in
815  * any way other then to save it.
816  */
817 struct journal_subrecord *
818 jrecord_push(struct jrecord *jrec, int16_t rectype)
819 {
820     struct journal_subrecord *save;
821 
822     save = jrec->parent;
823     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
824     jrec->last = NULL;
825     KKASSERT(jrec->parent != NULL);
826     ++jrec->pushcount;
827     ++jrec->pushptrgood;	/* cleared on flush */
828     return(save);
829 }
830 
831 /*
832  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
833  * on the last record written within the subtransaction.  If the last
834  * record written is not accessible or if the subtransaction is empty,
835  * we must write out a pad record with JMASK_LAST set before popping.
836  *
837  * When popping a subtransaction the parent record's recsize field
838  * will be properly set.  If the parent pointer is no longer valid
839  * (which can occur if the data has already been flushed out to the
840  * stream), the protocol spec allows us to leave it 0.
841  *
842  * The saved parent pointer which we restore may or may not be valid,
843  * and if not valid may or may not be NULL, depending on the value
844  * of pushptrgood.
845  */
846 void
847 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
848 {
849     struct journal_subrecord *last;
850 
851     KKASSERT(jrec->pushcount > 0);
852     KKASSERT(jrec->residual == 0);
853 
854     /*
855      * Set JMASK_LAST on the last record we wrote at the current
856      * level.  If last is NULL we either no longer have access to the
857      * record or the subtransaction was empty and we must write out a pad
858      * record.
859      */
860     if ((last = jrec->last) == NULL) {
861 	jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
862 	last = jrec->last;	/* reload after possible flush */
863     } else {
864 	last->rectype |= JMASK_LAST;
865     }
866 
867     /*
868      * pushptrgood tells us how many levels of parent record pointers
869      * are valid.  The jrec only stores the current parent record pointer
870      * (and it is only valid if pushptrgood != 0).  The higher level parent
871      * record pointers are saved by the routines calling jrecord_push() and
872      * jrecord_pop().  These pointers may become stale and we determine
873      * that fact by tracking the count of valid parent pointers with
874      * pushptrgood.  Pointers become invalid when their related stream
875      * record gets pushed out.
876      *
877      * If no pointer is available (the data has already been pushed out),
878      * then no fixup of e.g. the length field is possible for non-leaf
879      * nodes.  The protocol allows for this situation by placing a larger
880      * burden on the program scanning the stream on the other end.
881      *
882      * [parentA]
883      *	  [node X]
884      *    [parentB]
885      *	     [node Y]
886      *	     [node Z]
887      *    (pop B)	see NOTE B
888      * (pop A)		see NOTE A
889      *
890      * NOTE B:	This pop sets LAST in node Z if the node is still accessible,
891      *		else a PAD record is appended and LAST is set in that.
892      *
893      *		This pop sets the record size in parentB if parentB is still
894      *		accessible, else the record size is left 0 (the scanner must
895      *		deal with that).
896      *
897      *		This pop sets the new 'last' record to parentB, the pointer
898      *		to which may or may not still be accessible.
899      *
900      * NOTE A:	This pop sets LAST in parentB if the node is still accessible,
901      *		else a PAD record is appended and LAST is set in that.
902      *
903      *		This pop sets the record size in parentA if parentA is still
904      *		accessible, else the record size is left 0 (the scanner must
905      *		deal with that).
906      *
907      *		This pop sets the new 'last' record to parentA, the pointer
908      *		to which may or may not still be accessible.
909      *
910      * Also note that the last record in the stream transaction, which in
911      * the above example is parentA, does not currently have the LAST bit
912      * set.
913      *
914      * The current parent becomes the last record relative to the
915      * saved parent passed into us.  It's validity is based on
916      * whether pushptrgood is non-zero prior to decrementing.  The saved
917      * parent becomes the new parent, and its validity is based on whether
918      * pushptrgood is non-zero after decrementing.
919      *
920      * The old jrec->parent may be NULL if it is no longer accessible.
921      * If pushptrgood is non-zero, however, it is guarenteed to not
922      * be NULL (since no flush occured).
923      */
924     jrec->last = jrec->parent;
925     --jrec->pushcount;
926     if (jrec->pushptrgood) {
927 	KKASSERT(jrec->last != NULL && last != NULL);
928 	if (--jrec->pushptrgood == 0) {
929 	    jrec->parent = NULL;	/* 'save' contains garbage or NULL */
930 	} else {
931 	    KKASSERT(save != NULL);
932 	    jrec->parent = save;	/* 'save' must not be NULL */
933 	}
934 
935 	/*
936 	 * Set the record size in the old parent.  'last' still points to
937 	 * the original last record in the subtransaction being popped,
938 	 * jrec->last points to the old parent (which became the last
939 	 * record relative to the new parent being popped into).
940 	 */
941 	jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
942     } else {
943 	jrec->parent = NULL;
944 	KKASSERT(jrec->last == NULL);
945     }
946 }
947 
948 /*
949  * Write out a leaf record, including associated data.
950  */
951 void
952 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
953 {
954     jrecord_write(jrec, rectype, bytes);
955     jrecord_data(jrec, ptr, bytes, JDATA_KERN);
956 }
957 
958 void
959 jrecord_leaf_uio(struct jrecord *jrec, int16_t rectype,
960 		 struct uio *uio)
961 {
962     struct iovec *iov;
963     int i;
964 
965     for (i = 0; i < uio->uio_iovcnt; ++i) {
966 	iov = &uio->uio_iov[i];
967 	if (iov->iov_len == 0)
968 	    continue;
969 	if (uio->uio_segflg == UIO_SYSSPACE) {
970 	    jrecord_write(jrec, rectype, iov->iov_len);
971 	    jrecord_data(jrec, iov->iov_base, iov->iov_len, JDATA_KERN);
972 	} else { /* UIO_USERSPACE */
973 	    jrecord_write(jrec, rectype, iov->iov_len);
974 	    jrecord_data(jrec, iov->iov_base, iov->iov_len, JDATA_USER);
975 	}
976     }
977 }
978 
979 void
980 jrecord_leaf_xio(struct jrecord *jrec, int16_t rectype, xio_t xio)
981 {
982     int bytes = xio->xio_npages * PAGE_SIZE;
983 
984     jrecord_write(jrec, rectype, bytes);
985     jrecord_data(jrec, xio, bytes, JDATA_XIO);
986 }
987 
988 /*
989  * Write a leaf record out and return a pointer to its base.  The leaf
990  * record may contain potentially megabytes of data which is supplied
991  * in jrecord_data() calls.  The exact amount must be specified in this
992  * call.
993  *
994  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
995  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
996  * USE THE RETURN VALUE.
997  */
998 struct journal_subrecord *
999 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1000 {
1001     struct journal_subrecord *last;
1002     int pusheditout;
1003 
1004     /*
1005      * Try to catch some obvious errors.  Nesting records must specify a
1006      * size of 0, and there should be no left-overs from previous operations
1007      * (such as incomplete data writeouts).
1008      */
1009     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1010     KKASSERT(jrec->residual == 0);
1011 
1012     /*
1013      * Check to see if the current stream record has enough room for
1014      * the new subrecord header.  If it doesn't we extend the current
1015      * stream record.
1016      *
1017      * This may have the side effect of pushing out the current stream record
1018      * and creating a new one.  We must adjust our stream tracking fields
1019      * accordingly.
1020      */
1021     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1022 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1023 				jrec->stream_reserved - jrec->stream_residual,
1024 				JREC_DEFAULTSIZE, &pusheditout);
1025 	if (pusheditout) {
1026 	    /*
1027 	     * If a pushout occured, the pushed out stream record was
1028 	     * truncated as specified and the new record is exactly the
1029 	     * extension size specified.
1030 	     */
1031 	    jrec->stream_reserved = JREC_DEFAULTSIZE;
1032 	    jrec->stream_residual = JREC_DEFAULTSIZE;
1033 	    jrec->parent = NULL;	/* no longer accessible */
1034 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1035 	} else {
1036 	    /*
1037 	     * If no pushout occured the stream record is NOT truncated and
1038 	     * IS extended.
1039 	     */
1040 	    jrec->stream_reserved += JREC_DEFAULTSIZE;
1041 	    jrec->stream_residual += JREC_DEFAULTSIZE;
1042 	}
1043     }
1044     last = (void *)jrec->stream_ptr;
1045     last->rectype = rectype;
1046     last->reserved = 0;
1047 
1048     /*
1049      * We may not know the record size for recursive records and the
1050      * header may become unavailable due to limited FIFO space.  Write
1051      * -1 to indicate this special case.
1052      */
1053     if ((rectype & JMASK_NESTED) && bytes == 0)
1054 	last->recsize = -1;
1055     else
1056 	last->recsize = sizeof(struct journal_subrecord) + bytes;
1057     jrec->last = last;
1058     jrec->residual = bytes;		/* remaining data to be posted */
1059     jrec->residual_align = -bytes & 7;	/* post-data alignment required */
1060     jrec->stream_ptr += sizeof(*last);	/* current write pointer */
1061     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1062     return(last);
1063 }
1064 
1065 /*
1066  * Write out the data associated with a leaf record.  Any number of calls
1067  * to this routine may be made as long as the byte count adds up to the
1068  * amount originally specified in jrecord_write().
1069  *
1070  * The act of writing out the leaf data may result in numerous stream records
1071  * being pushed out.   Callers should be aware that even the associated
1072  * subrecord header may become inaccessible due to stream record pushouts.
1073  */
1074 static void
1075 jrecord_data(struct jrecord *jrec, void *buf, int bytes, int dtype)
1076 {
1077     int pusheditout;
1078     int extsize;
1079     int xio_offset = 0;
1080 
1081     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1082 
1083     /*
1084      * Push out stream records as long as there is insufficient room to hold
1085      * the remaining data.
1086      */
1087     while (jrec->stream_residual < bytes) {
1088 	/*
1089 	 * Fill in any remaining space in the current stream record.
1090 	 */
1091 	switch (dtype) {
1092 	case JDATA_KERN:
1093 	    bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1094 	    break;
1095 	case JDATA_USER:
1096 	    copyin(buf, jrec->stream_ptr, jrec->stream_residual);
1097 	    break;
1098 	case JDATA_XIO:
1099 	    xio_copy_xtok((xio_t)buf, xio_offset, jrec->stream_ptr,
1100 			  jrec->stream_residual);
1101 	    xio_offset += jrec->stream_residual;
1102 	    break;
1103 	}
1104 	if (dtype != JDATA_XIO)
1105 	    buf = (char *)buf + jrec->stream_residual;
1106 	bytes -= jrec->stream_residual;
1107 	/*jrec->stream_ptr += jrec->stream_residual;*/
1108 	jrec->residual -= jrec->stream_residual;
1109 	jrec->stream_residual = 0;
1110 
1111 	/*
1112 	 * Try to extend the current stream record, but no more then 1/4
1113 	 * the size of the FIFO.
1114 	 */
1115 	extsize = jrec->jo->fifo.size >> 2;
1116 	if (extsize > bytes)
1117 	    extsize = (bytes + 15) & ~15;
1118 
1119 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1120 				jrec->stream_reserved - jrec->stream_residual,
1121 				extsize, &pusheditout);
1122 	if (pusheditout) {
1123 	    jrec->stream_reserved = extsize;
1124 	    jrec->stream_residual = extsize;
1125 	    jrec->parent = NULL;	/* no longer accessible */
1126 	    jrec->last = NULL;		/* no longer accessible */
1127 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1128 	} else {
1129 	    jrec->stream_reserved += extsize;
1130 	    jrec->stream_residual += extsize;
1131 	}
1132     }
1133 
1134     /*
1135      * Push out any remaining bytes into the current stream record.
1136      */
1137     if (bytes) {
1138 	switch (dtype) {
1139 	case JDATA_KERN:
1140 	    bcopy(buf, jrec->stream_ptr, bytes);
1141 	    break;
1142 	case JDATA_USER:
1143 	    copyin(buf, jrec->stream_ptr, bytes);
1144 	    break;
1145 	case JDATA_XIO:
1146 	    xio_copy_xtok((xio_t)buf, xio_offset, jrec->stream_ptr, bytes);
1147 	    break;
1148 	}
1149 	jrec->stream_ptr += bytes;
1150 	jrec->stream_residual -= bytes;
1151 	jrec->residual -= bytes;
1152     }
1153 
1154     /*
1155      * Handle data alignment requirements for the subrecord.  Because the
1156      * stream record's data space is more strictly aligned, it must already
1157      * have sufficient space to hold any subrecord alignment slop.
1158      */
1159     if (jrec->residual == 0 && jrec->residual_align) {
1160 	KKASSERT(jrec->residual_align <= jrec->stream_residual);
1161 	bzero(jrec->stream_ptr, jrec->residual_align);
1162 	jrec->stream_ptr += jrec->residual_align;
1163 	jrec->stream_residual -= jrec->residual_align;
1164 	jrec->residual_align = 0;
1165     }
1166 }
1167 
1168 /*
1169  * We are finished with the transaction.  This closes the transaction created
1170  * by jrecord_init().
1171  *
1172  * NOTE: If abortit is not set then we must be at the top level with no
1173  *	 residual subrecord data left to output.
1174  *
1175  *	 If abortit is set then we can be in any state, all pushes will be
1176  *	 popped and it is ok for there to be residual data.  This works
1177  *	 because the virtual stream itself is truncated.  Scanners must deal
1178  *	 with this situation.
1179  *
1180  * The stream record will be committed or aborted as specified and jrecord
1181  * resources will be cleaned up.
1182  */
1183 void
1184 jrecord_done(struct jrecord *jrec, int abortit)
1185 {
1186     KKASSERT(jrec->rawp != NULL);
1187 
1188     if (abortit) {
1189 	journal_abort(jrec->jo, &jrec->rawp);
1190     } else {
1191 	KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1192 	journal_commit(jrec->jo, &jrec->rawp,
1193 			jrec->stream_reserved - jrec->stream_residual, 1);
1194     }
1195 
1196     /*
1197      * jrec should not be used beyond this point without another init,
1198      * but clean up some fields to ensure that we panic if it is.
1199      *
1200      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1201      */
1202     jrec->jo = NULL;
1203     jrec->stream_ptr = NULL;
1204 }
1205 
1206 /************************************************************************
1207  *			LOW LEVEL RECORD SUPPORT ROUTINES		*
1208  ************************************************************************
1209  *
1210  * These routine create low level recursive and leaf subrecords representing
1211  * common filesystem structures.
1212  */
1213 
1214 /*
1215  * Write out a filename path relative to the base of the mount point.
1216  * rectype is typically JLEAF_PATH{1,2,3,4}.
1217  */
1218 void
1219 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1220 {
1221     char buf[64];	/* local buffer if it fits, else malloced */
1222     char *base;
1223     int pathlen;
1224     int index;
1225     struct namecache *scan;
1226 
1227     /*
1228      * Pass 1 - figure out the number of bytes required.  Include terminating
1229      * 	       \0 on last element and '/' separator on other elements.
1230      *
1231      * The namecache topology terminates at the root of the filesystem
1232      * (the normal lookup code would then continue by using the mount
1233      * structure to figure out what it was mounted on).
1234      */
1235 again:
1236     pathlen = 0;
1237     for (scan = ncp; scan; scan = scan->nc_parent) {
1238 	if (scan->nc_nlen > 0)
1239 	    pathlen += scan->nc_nlen + 1;
1240     }
1241 
1242     if (pathlen <= sizeof(buf))
1243 	base = buf;
1244     else
1245 	base = kmalloc(pathlen, M_TEMP, M_INTWAIT);
1246 
1247     /*
1248      * Pass 2 - generate the path buffer
1249      */
1250     index = pathlen;
1251     for (scan = ncp; scan; scan = scan->nc_parent) {
1252 	if (scan->nc_nlen == 0)
1253 	    continue;
1254 	if (scan->nc_nlen >= index) {
1255 	    if (base != buf)
1256 		kfree(base, M_TEMP);
1257 	    goto again;
1258 	}
1259 	if (index == pathlen)
1260 	    base[--index] = 0;
1261 	else
1262 	    base[--index] = '/';
1263 	index -= scan->nc_nlen;
1264 	bcopy(scan->nc_name, base + index, scan->nc_nlen);
1265     }
1266     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1267     if (base != buf)
1268 	kfree(base, M_TEMP);
1269 }
1270 
1271 /*
1272  * Write out a file attribute structure.  While somewhat inefficient, using
1273  * a recursive data structure is the most portable and extensible way.
1274  */
1275 void
1276 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1277 {
1278     void *save;
1279 
1280     save = jrecord_push(jrec, JTYPE_VATTR);
1281     if (vat->va_type != VNON)
1282 	jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1283     if (vat->va_mode != (mode_t)VNOVAL)
1284 	jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1285     if (vat->va_nlink != VNOVAL)
1286 	jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1287     if (vat->va_uid != VNOVAL)
1288 	jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1289     if (vat->va_gid != VNOVAL)
1290 	jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1291     if (vat->va_fsid != VNOVAL)
1292 	jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1293     if (vat->va_fileid != VNOVAL)
1294 	jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1295     if (vat->va_size != VNOVAL)
1296 	jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1297     if (vat->va_atime.tv_sec != VNOVAL)
1298 	jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1299     if (vat->va_mtime.tv_sec != VNOVAL)
1300 	jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1301     if (vat->va_ctime.tv_sec != VNOVAL)
1302 	jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1303     if (vat->va_gen != VNOVAL)
1304 	jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1305     if (vat->va_flags != VNOVAL)
1306 	jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1307     if (vat->va_rmajor != VNOVAL) {
1308 	udev_t rdev = makeudev(vat->va_rmajor, vat->va_rminor);
1309 	jrecord_leaf(jrec, JLEAF_UDEV, &rdev, sizeof(rdev));
1310 	jrecord_leaf(jrec, JLEAF_UMAJOR, &vat->va_rmajor, sizeof(vat->va_rmajor));
1311 	jrecord_leaf(jrec, JLEAF_UMINOR, &vat->va_rminor, sizeof(vat->va_rminor));
1312     }
1313 #if 0
1314     if (vat->va_filerev != VNOVAL)
1315 	jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1316 #endif
1317     jrecord_pop(jrec, save);
1318 }
1319 
1320 /*
1321  * Write out the creds used to issue a file operation.  If a process is
1322  * available write out additional tracking information related to the
1323  * process.
1324  *
1325  * XXX additional tracking info
1326  * XXX tty line info
1327  */
1328 void
1329 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1330 {
1331     void *save;
1332     struct proc *p;
1333 
1334     save = jrecord_push(jrec, JTYPE_CRED);
1335     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1336     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1337     if (td && (p = td->td_proc) != NULL) {
1338 	jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1339 	jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1340     }
1341     jrecord_pop(jrec, save);
1342 }
1343 
1344 /*
1345  * Write out information required to identify a vnode
1346  *
1347  * XXX this needs work.  We should write out the inode number as well,
1348  * and in fact avoid writing out the file path for seqential writes
1349  * occuring within e.g. a certain period of time.
1350  */
1351 void
1352 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1353 {
1354     struct nchandle nch;
1355 
1356     nch.mount = vp->v_mount;
1357     spin_lock(&vp->v_spin);
1358     TAILQ_FOREACH(nch.ncp, &vp->v_namecache, nc_vnode) {
1359 	if ((nch.ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1360 	    break;
1361     }
1362     if (nch.ncp) {
1363 	cache_hold(&nch);
1364 	spin_unlock(&vp->v_spin);
1365 	jrecord_write_path(jrec, JLEAF_PATH_REF, nch.ncp);
1366 	cache_drop(&nch);
1367     } else {
1368 	spin_unlock(&vp->v_spin);
1369     }
1370 }
1371 
1372 void
1373 jrecord_write_vnode_link(struct jrecord *jrec, struct vnode *vp,
1374 			 struct namecache *notncp)
1375 {
1376     struct nchandle nch;
1377 
1378     nch.mount = vp->v_mount;
1379     spin_lock(&vp->v_spin);
1380     TAILQ_FOREACH(nch.ncp, &vp->v_namecache, nc_vnode) {
1381 	if (nch.ncp == notncp)
1382 	    continue;
1383 	if ((nch.ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1384 	    break;
1385     }
1386     if (nch.ncp) {
1387 	cache_hold(&nch);
1388 	spin_unlock(&vp->v_spin);
1389 	jrecord_write_path(jrec, JLEAF_PATH_REF, nch.ncp);
1390 	cache_drop(&nch);
1391     } else {
1392 	spin_unlock(&vp->v_spin);
1393     }
1394 }
1395 
1396 /*
1397  * Write out the data represented by a pagelist
1398  */
1399 void
1400 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1401 			struct vm_page **pglist, int *rtvals, int pgcount,
1402 			off_t offset)
1403 {
1404     struct xio xio;
1405     int error;
1406     int b;
1407     int i;
1408 
1409     i = 0;
1410     xio_init(&xio);
1411     while (i < pgcount) {
1412 	/*
1413 	 * Find the next valid section.  Skip any invalid elements
1414 	 */
1415 	if (rtvals[i] != VM_PAGER_OK) {
1416 	    ++i;
1417 	    offset += PAGE_SIZE;
1418 	    continue;
1419 	}
1420 
1421 	/*
1422 	 * Figure out how big the valid section is, capping I/O at what the
1423 	 * MSFBUF can represent.
1424 	 */
1425 	b = i;
1426 	while (i < pgcount && i - b != XIO_INTERNAL_PAGES &&
1427 	       rtvals[i] == VM_PAGER_OK
1428 	) {
1429 	    ++i;
1430 	}
1431 
1432 	/*
1433 	 * And write it out.
1434 	 */
1435 	if (i - b) {
1436 	    error = xio_init_pages(&xio, pglist + b, i - b, XIOF_READ);
1437 	    if (error == 0) {
1438 		jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1439 		jrecord_leaf_xio(jrec, rectype, &xio);
1440 	    } else {
1441 		kprintf("jrecord_write_pagelist: xio init failure\n");
1442 	    }
1443 	    xio_release(&xio);
1444 	    offset += (off_t)(i - b) << PAGE_SHIFT;
1445 	}
1446     }
1447 }
1448 
1449 /*
1450  * Write out the data represented by a UIO.
1451  */
1452 void
1453 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1454 {
1455     if (uio->uio_segflg != UIO_NOCOPY) {
1456 	jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset,
1457 		     sizeof(uio->uio_offset));
1458 	jrecord_leaf_uio(jrec, rectype, uio);
1459     }
1460 }
1461 
1462 void
1463 jrecord_file_data(struct jrecord *jrec, struct vnode *vp,
1464 		  off_t off, off_t bytes)
1465 {
1466     const int bufsize = 8192;
1467     char *buf;
1468     int error;
1469     int n;
1470 
1471     buf = kmalloc(bufsize, M_JOURNAL, M_WAITOK);
1472     jrecord_leaf(jrec, JLEAF_SEEKPOS, &off, sizeof(off));
1473     while (bytes) {
1474 	n = (bytes > bufsize) ? bufsize : (int)bytes;
1475 	error = vn_rdwr(UIO_READ, vp, buf, n, off, UIO_SYSSPACE, IO_NODELOCKED,
1476 			proc0.p_ucred, NULL);
1477 	if (error) {
1478 	    jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
1479 	    break;
1480 	}
1481 	jrecord_leaf(jrec, JLEAF_FILEDATA, buf, n);
1482 	bytes -= n;
1483 	off += n;
1484     }
1485     kfree(buf, M_JOURNAL);
1486 }
1487 
1488