xref: /dragonfly/sys/kern/vfs_jops.c (revision 87de5057)
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_jops.c,v 1.24 2006/05/06 02:43:12 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread.
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger
60  * spooling areas then the memory buffer is able to provide by e.g.
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/journal.h>
88 #include <sys/file.h>
89 #include <sys/proc.h>
90 #include <sys/msfbuf.h>
91 #include <sys/socket.h>
92 #include <sys/socketvar.h>
93 
94 #include <machine/limits.h>
95 
96 #include <vm/vm.h>
97 #include <vm/vm_object.h>
98 #include <vm/vm_page.h>
99 #include <vm/vm_pager.h>
100 #include <vm/vnode_pager.h>
101 
102 #include <sys/file2.h>
103 #include <sys/thread2.h>
104 
105 static int journal_attach(struct mount *mp);
106 static void journal_detach(struct mount *mp);
107 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
108 			    const struct mountctl_install_journal *info);
109 static int journal_restart_vfs_journal(struct mount *mp, struct file *fp,
110 			    const struct mountctl_restart_journal *info);
111 static int journal_remove_vfs_journal(struct mount *mp,
112 			    const struct mountctl_remove_journal *info);
113 static int journal_restart(struct mount *mp, struct file *fp,
114 			    struct journal *jo, int flags);
115 static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
116 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
117 static int journal_status_vfs_journal(struct mount *mp,
118 		       const struct mountctl_status_journal *info,
119 		       struct mountctl_journal_ret_status *rstat,
120 		       int buflen, int *res);
121 static void journal_create_threads(struct journal *jo);
122 static void journal_destroy_threads(struct journal *jo, int flags);
123 static void journal_wthread(void *info);
124 static void journal_rthread(void *info);
125 
126 static void *journal_reserve(struct journal *jo,
127 			    struct journal_rawrecbeg **rawpp,
128 			    int16_t streamid, int bytes);
129 static void *journal_extend(struct journal *jo,
130 			    struct journal_rawrecbeg **rawpp,
131 			    int truncbytes, int bytes, int *newstreamrecp);
132 static void journal_abort(struct journal *jo,
133 			    struct journal_rawrecbeg **rawpp);
134 static void journal_commit(struct journal *jo,
135 			    struct journal_rawrecbeg **rawpp,
136 			    int bytes, int closeout);
137 
138 static void jrecord_init(struct journal *jo,
139 			    struct jrecord *jrec, int16_t streamid);
140 static struct journal_subrecord *jrecord_push(
141 			    struct jrecord *jrec, int16_t rectype);
142 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
143 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
144 			    int16_t rectype, int bytes);
145 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
146 static void jrecord_done(struct jrecord *jrec, int abortit);
147 static void jrecord_undo_file(struct jrecord *jrec, struct vnode *vp,
148 			    int jrflags, off_t off, off_t bytes);
149 
150 static int journal_setattr(struct vop_setattr_args *ap);
151 static int journal_write(struct vop_write_args *ap);
152 static int journal_fsync(struct vop_fsync_args *ap);
153 static int journal_putpages(struct vop_putpages_args *ap);
154 static int journal_setacl(struct vop_setacl_args *ap);
155 static int journal_setextattr(struct vop_setextattr_args *ap);
156 static int journal_ncreate(struct vop_ncreate_args *ap);
157 static int journal_nmknod(struct vop_nmknod_args *ap);
158 static int journal_nlink(struct vop_nlink_args *ap);
159 static int journal_nsymlink(struct vop_nsymlink_args *ap);
160 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
161 static int journal_nremove(struct vop_nremove_args *ap);
162 static int journal_nmkdir(struct vop_nmkdir_args *ap);
163 static int journal_nrmdir(struct vop_nrmdir_args *ap);
164 static int journal_nrename(struct vop_nrename_args *ap);
165 
166 #define JRUNDO_SIZE	0x00000001
167 #define JRUNDO_UID	0x00000002
168 #define JRUNDO_GID	0x00000004
169 #define JRUNDO_FSID	0x00000008
170 #define JRUNDO_MODES	0x00000010
171 #define JRUNDO_INUM	0x00000020
172 #define JRUNDO_ATIME	0x00000040
173 #define JRUNDO_MTIME	0x00000080
174 #define JRUNDO_CTIME	0x00000100
175 #define JRUNDO_GEN	0x00000200
176 #define JRUNDO_FLAGS	0x00000400
177 #define JRUNDO_UDEV	0x00000800
178 #define JRUNDO_NLINK	0x00001000
179 #define JRUNDO_FILEDATA	0x00010000
180 #define JRUNDO_GETVP	0x00020000
181 #define JRUNDO_CONDLINK	0x00040000	/* write file data if link count 1 */
182 #define JRUNDO_VATTR	(JRUNDO_SIZE|JRUNDO_UID|JRUNDO_GID|JRUNDO_FSID|\
183 			 JRUNDO_MODES|JRUNDO_INUM|JRUNDO_ATIME|JRUNDO_MTIME|\
184 			 JRUNDO_CTIME|JRUNDO_GEN|JRUNDO_FLAGS|JRUNDO_UDEV|\
185 			 JRUNDO_NLINK)
186 #define JRUNDO_ALL	(JRUNDO_VATTR|JRUNDO_FILEDATA)
187 
188 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
189     { &vop_default_desc,		vop_journal_operate_ap },
190     { &vop_mountctl_desc,		(void *)journal_mountctl },
191     { &vop_setattr_desc,		(void *)journal_setattr },
192     { &vop_write_desc,			(void *)journal_write },
193     { &vop_fsync_desc,			(void *)journal_fsync },
194     { &vop_putpages_desc,		(void *)journal_putpages },
195     { &vop_setacl_desc,			(void *)journal_setacl },
196     { &vop_setextattr_desc,		(void *)journal_setextattr },
197     { &vop_ncreate_desc,		(void *)journal_ncreate },
198     { &vop_nmknod_desc,			(void *)journal_nmknod },
199     { &vop_nlink_desc,			(void *)journal_nlink },
200     { &vop_nsymlink_desc,		(void *)journal_nsymlink },
201     { &vop_nwhiteout_desc,		(void *)journal_nwhiteout },
202     { &vop_nremove_desc,		(void *)journal_nremove },
203     { &vop_nmkdir_desc,			(void *)journal_nmkdir },
204     { &vop_nrmdir_desc,			(void *)journal_nrmdir },
205     { &vop_nrename_desc,		(void *)journal_nrename },
206     { NULL, NULL }
207 };
208 
209 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
210 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
211 
212 int
213 journal_mountctl(struct vop_mountctl_args *ap)
214 {
215     struct mount *mp;
216     int error = 0;
217 
218     mp = ap->a_head.a_ops->vv_mount;
219     KKASSERT(mp);
220 
221     if (mp->mnt_vn_journal_ops == NULL) {
222 	switch(ap->a_op) {
223 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
224 	    error = journal_attach(mp);
225 	    if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
226 		error = EINVAL;
227 	    if (error == 0 && ap->a_fp == NULL)
228 		error = EBADF;
229 	    if (error == 0)
230 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
231 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
232 		journal_detach(mp);
233 	    break;
234 	case MOUNTCTL_RESTART_VFS_JOURNAL:
235 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
236 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
237 	case MOUNTCTL_STATUS_VFS_JOURNAL:
238 	    error = ENOENT;
239 	    break;
240 	default:
241 	    error = EOPNOTSUPP;
242 	    break;
243 	}
244     } else {
245 	switch(ap->a_op) {
246 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
247 	    if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
248 		error = EINVAL;
249 	    if (error == 0 && ap->a_fp == NULL)
250 		error = EBADF;
251 	    if (error == 0)
252 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
253 	    break;
254 	case MOUNTCTL_RESTART_VFS_JOURNAL:
255 	    if (ap->a_ctllen != sizeof(struct mountctl_restart_journal))
256 		error = EINVAL;
257 	    if (error == 0 && ap->a_fp == NULL)
258 		error = EBADF;
259 	    if (error == 0)
260 		error = journal_restart_vfs_journal(mp, ap->a_fp, ap->a_ctl);
261 	    break;
262 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
263 	    if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
264 		error = EINVAL;
265 	    if (error == 0)
266 		error = journal_remove_vfs_journal(mp, ap->a_ctl);
267 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
268 		journal_detach(mp);
269 	    break;
270 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
271 	    if (ap->a_ctllen != 0)
272 		error = EINVAL;
273 	    error = journal_resync_vfs_journal(mp, ap->a_ctl);
274 	    break;
275 	case MOUNTCTL_STATUS_VFS_JOURNAL:
276 	    if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
277 		error = EINVAL;
278 	    if (error == 0) {
279 		error = journal_status_vfs_journal(mp, ap->a_ctl,
280 					ap->a_buf, ap->a_buflen, ap->a_res);
281 	    }
282 	    break;
283 	default:
284 	    error = EOPNOTSUPP;
285 	    break;
286 	}
287     }
288     return (error);
289 }
290 
291 /*
292  * High level mount point setup.  When a
293  */
294 static int
295 journal_attach(struct mount *mp)
296 {
297     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops,
298 		     journal_vnodeop_entries, 0);
299     return(0);
300 }
301 
302 static void
303 journal_detach(struct mount *mp)
304 {
305     if (mp->mnt_vn_journal_ops)
306 	vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
307 }
308 
309 /*
310  * Install a journal on a mount point.  Each journal has an associated worker
311  * thread which is responsible for buffering and spooling the data to the
312  * target.  A mount point may have multiple journals attached to it.  An
313  * initial start record is generated when the journal is associated.
314  */
315 static int
316 journal_install_vfs_journal(struct mount *mp, struct file *fp,
317 			    const struct mountctl_install_journal *info)
318 {
319     struct journal *jo;
320     struct jrecord jrec;
321     int error = 0;
322     int size;
323 
324     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
325     bcopy(info->id, jo->id, sizeof(jo->id));
326     jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
327 				MC_JOURNAL_STOP_REQ);
328 
329     /*
330      * Memory FIFO size, round to nearest power of 2
331      */
332     if (info->membufsize) {
333 	if (info->membufsize < 65536)
334 	    size = 65536;
335 	else if (info->membufsize > 128 * 1024 * 1024)
336 	    size = 128 * 1024 * 1024;
337 	else
338 	    size = (int)info->membufsize;
339     } else {
340 	size = 1024 * 1024;
341     }
342     jo->fifo.size = 1;
343     while (jo->fifo.size < size)
344 	jo->fifo.size <<= 1;
345 
346     /*
347      * Other parameters.  If not specified the starting transaction id
348      * will be the current date.
349      */
350     if (info->transid) {
351 	jo->transid = info->transid;
352     } else {
353 	struct timespec ts;
354 	getnanotime(&ts);
355 	jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
356     }
357 
358     jo->fp = fp;
359 
360     /*
361      * Allocate the memory FIFO
362      */
363     jo->fifo.mask = jo->fifo.size - 1;
364     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
365     if (jo->fifo.membase == NULL)
366 	error = ENOMEM;
367 
368     /*
369      * Create the worker threads and generate the association record.
370      */
371     if (error) {
372 	free(jo, M_JOURNAL);
373     } else {
374 	fhold(fp);
375 	journal_create_threads(jo);
376 	jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
377 	jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
378 	jrecord_done(&jrec, 0);
379 	TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
380     }
381     return(error);
382 }
383 
384 /*
385  * Restart a journal with a new descriptor.   The existing reader and writer
386  * threads are terminated and a new descriptor is associated with the
387  * journal.  The FIFO rindex is reset to xindex and the threads are then
388  * restarted.
389  */
390 static int
391 journal_restart_vfs_journal(struct mount *mp, struct file *fp,
392 			   const struct mountctl_restart_journal *info)
393 {
394     struct journal *jo;
395     int error;
396 
397     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
398 	if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
399 	    break;
400     }
401     if (jo)
402 	error = journal_restart(mp, fp, jo, info->flags);
403     else
404 	error = EINVAL;
405     return (error);
406 }
407 
408 static int
409 journal_restart(struct mount *mp, struct file *fp,
410 		struct journal *jo, int flags)
411 {
412     /*
413      * XXX lock the jo
414      */
415 
416 #if 0
417     /*
418      * Record the fact that we are doing a restart in the journal.
419      * XXX it isn't safe to do this if the journal is being restarted
420      * because it was locked up and the writer thread has already exited.
421      */
422     jrecord_init(jo, &jrec, JREC_STREAMID_RESTART);
423     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
424     jrecord_done(&jrec, 0);
425 #endif
426 
427     /*
428      * Stop the reader and writer threads and clean up the current
429      * descriptor.
430      */
431     printf("RESTART WITH FP %p KILLING %p\n", fp, jo->fp);
432     journal_destroy_threads(jo, flags);
433 
434     if (jo->fp)
435 	fdrop(jo->fp, curthread);
436 
437     /*
438      * Associate the new descriptor, reset the FIFO index, and recreate
439      * the threads.
440      */
441     fhold(fp);
442     jo->fp = fp;
443     jo->fifo.rindex = jo->fifo.xindex;
444     journal_create_threads(jo);
445 
446     return(0);
447 }
448 
449 /*
450  * Disassociate a journal from a mount point and terminate its worker thread.
451  * A final termination record is written out before the file pointer is
452  * dropped.
453  */
454 static int
455 journal_remove_vfs_journal(struct mount *mp,
456 			   const struct mountctl_remove_journal *info)
457 {
458     struct journal *jo;
459     int error;
460 
461     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
462 	if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
463 	    break;
464     }
465     if (jo)
466 	error = journal_destroy(mp, jo, info->flags);
467     else
468 	error = EINVAL;
469     return (error);
470 }
471 
472 /*
473  * Remove all journals associated with a mount point.  Usually called
474  * by the umount code.
475  */
476 void
477 journal_remove_all_journals(struct mount *mp, int flags)
478 {
479     struct journal *jo;
480 
481     while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
482 	journal_destroy(mp, jo, flags);
483     }
484 }
485 
486 static int
487 journal_destroy(struct mount *mp, struct journal *jo, int flags)
488 {
489     struct jrecord jrec;
490 
491     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
492 
493     jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
494     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
495     jrecord_done(&jrec, 0);
496 
497     journal_destroy_threads(jo, flags);
498 
499     if (jo->fp)
500 	fdrop(jo->fp, curthread);
501     if (jo->fifo.membase)
502 	free(jo->fifo.membase, M_JFIFO);
503     free(jo, M_JOURNAL);
504     return(0);
505 }
506 
507 static int
508 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
509 {
510     return(EINVAL);
511 }
512 
513 static int
514 journal_status_vfs_journal(struct mount *mp,
515 		       const struct mountctl_status_journal *info,
516 		       struct mountctl_journal_ret_status *rstat,
517 		       int buflen, int *res)
518 {
519     struct journal *jo;
520     int error = 0;
521     int index;
522 
523     index = 0;
524     *res = 0;
525     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
526 	if (info->index == MC_JOURNAL_INDEX_ID) {
527 	    if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
528 		continue;
529 	} else if (info->index >= 0) {
530 	    if (info->index < index)
531 		continue;
532 	} else if (info->index != MC_JOURNAL_INDEX_ALL) {
533 	    continue;
534 	}
535 	if (buflen < sizeof(*rstat)) {
536 	    if (*res)
537 		rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
538 	    else
539 		error = EINVAL;
540 	    break;
541 	}
542 	bzero(rstat, sizeof(*rstat));
543 	rstat->recsize = sizeof(*rstat);
544 	bcopy(jo->id, rstat->id, sizeof(jo->id));
545 	rstat->index = index;
546 	rstat->membufsize = jo->fifo.size;
547 	rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
548 	rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
549 	rstat->bytessent = jo->total_acked;
550 	rstat->fifostalls = jo->fifostalls;
551 	++rstat;
552 	++index;
553 	*res += sizeof(*rstat);
554 	buflen -= sizeof(*rstat);
555     }
556     return(error);
557 }
558 
559 static void
560 journal_create_threads(struct journal *jo)
561 {
562 	jo->flags &= ~(MC_JOURNAL_STOP_REQ | MC_JOURNAL_STOP_IMM);
563 	jo->flags |= MC_JOURNAL_WACTIVE;
564 	lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
565 			TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
566 	lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
567 	lwkt_schedule(&jo->wthread);
568 
569 	if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
570 	    jo->flags |= MC_JOURNAL_RACTIVE;
571 	    lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
572 			TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
573 	    lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
574 	    lwkt_schedule(&jo->rthread);
575 	}
576 }
577 
578 static void
579 journal_destroy_threads(struct journal *jo, int flags)
580 {
581     int wcount;
582 
583     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
584     wakeup(&jo->fifo);
585     wcount = 0;
586     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
587 	tsleep(jo, 0, "jwait", hz);
588 	if (++wcount % 10 == 0) {
589 	    printf("Warning: journal %s waiting for descriptors to close\n",
590 		jo->id);
591 	}
592     }
593 
594     /*
595      * XXX SMP - threads should move to cpu requesting the restart or
596      * termination before finishing up to properly interlock.
597      */
598     tsleep(jo, 0, "jwait", hz);
599     lwkt_free_thread(&jo->wthread);
600     if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
601 	lwkt_free_thread(&jo->rthread);
602 }
603 
604 /*
605  * The per-journal worker thread is responsible for writing out the
606  * journal's FIFO to the target stream.
607  */
608 static void
609 journal_wthread(void *info)
610 {
611     struct journal *jo = info;
612     struct journal_rawrecbeg *rawp;
613     int bytes;
614     int error;
615     int avail;
616     int res;
617 
618     for (;;) {
619 	/*
620 	 * Calculate the number of bytes available to write.  This buffer
621 	 * area may contain reserved records so we can't just write it out
622 	 * without further checks.
623 	 */
624 	bytes = jo->fifo.windex - jo->fifo.rindex;
625 
626 	/*
627 	 * sleep if no bytes are available or if an incomplete record is
628 	 * encountered (it needs to be filled in before we can write it
629 	 * out), and skip any pad records that we encounter.
630 	 */
631 	if (bytes == 0) {
632 	    if (jo->flags & MC_JOURNAL_STOP_REQ)
633 		break;
634 	    tsleep(&jo->fifo, 0, "jfifo", hz);
635 	    continue;
636 	}
637 
638 	/*
639 	 * Sleep if we can not go any further due to hitting an incomplete
640 	 * record.  This case should occur rarely but may have to be better
641 	 * optimized XXX.
642 	 */
643 	rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
644 	if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
645 	    tsleep(&jo->fifo, 0, "jpad", hz);
646 	    continue;
647 	}
648 
649 	/*
650 	 * Skip any pad records.  We do not write out pad records if we can
651 	 * help it.
652 	 */
653 	if (rawp->streamid == JREC_STREAMID_PAD) {
654 	    if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
655 		if (jo->fifo.rindex == jo->fifo.xindex) {
656 		    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
657 		    jo->total_acked += (rawp->recsize + 15) & ~15;
658 		}
659 	    }
660 	    jo->fifo.rindex += (rawp->recsize + 15) & ~15;
661 	    jo->total_acked += bytes;
662 	    KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
663 	    continue;
664 	}
665 
666 	/*
667 	 * 'bytes' is the amount of data that can potentially be written out.
668 	 * Calculate 'res', the amount of data that can actually be written
669 	 * out.  res is bounded either by hitting the end of the physical
670 	 * memory buffer or by hitting an incomplete record.  Incomplete
671 	 * records often occur due to the way the space reservation model
672 	 * works.
673 	 */
674 	res = 0;
675 	avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
676 	while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
677 	    res += (rawp->recsize + 15) & ~15;
678 	    if (res >= avail) {
679 		KKASSERT(res == avail);
680 		break;
681 	    }
682 	    rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
683 	}
684 
685 	/*
686 	 * Issue the write and deal with any errors or other conditions.
687 	 * For now assume blocking I/O.  Since we are record-aware the
688 	 * code cannot yet handle partial writes.
689 	 *
690 	 * We bump rindex prior to issuing the write to avoid racing
691 	 * the acknowledgement coming back (which could prevent the ack
692 	 * from bumping xindex).  Restarts are always based on xindex so
693 	 * we do not try to undo the rindex if an error occurs.
694 	 *
695 	 * XXX EWOULDBLOCK/NBIO
696 	 * XXX notification on failure
697 	 * XXX permanent verses temporary failures
698 	 * XXX two-way acknowledgement stream in the return direction / xindex
699 	 */
700 	bytes = res;
701 	jo->fifo.rindex += bytes;
702 	error = fp_write(jo->fp,
703 			jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
704 			bytes, &res);
705 	if (error) {
706 	    printf("journal_thread(%s) write, error %d\n", jo->id, error);
707 	    /* XXX */
708 	} else {
709 	    KKASSERT(res == bytes);
710 	}
711 
712 	/*
713 	 * Advance rindex.  If the journal stream is not full duplex we also
714 	 * advance xindex, otherwise the rjournal thread is responsible for
715 	 * advancing xindex.
716 	 */
717 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
718 	    jo->fifo.xindex += bytes;
719 	    jo->total_acked += bytes;
720 	}
721 	KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
722 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
723 	    if (jo->flags & MC_JOURNAL_WWAIT) {
724 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
725 		wakeup(&jo->fifo.windex);
726 	    }
727 	}
728     }
729     fp_shutdown(jo->fp, SHUT_WR);
730     jo->flags &= ~MC_JOURNAL_WACTIVE;
731     wakeup(jo);
732     wakeup(&jo->fifo.windex);
733 }
734 
735 /*
736  * A second per-journal worker thread is created for two-way journaling
737  * streams to deal with the return acknowledgement stream.
738  */
739 static void
740 journal_rthread(void *info)
741 {
742     struct journal_rawrecbeg *rawp;
743     struct journal_ackrecord ack;
744     struct journal *jo = info;
745     int64_t transid;
746     int error;
747     int count;
748     int bytes;
749 
750     transid = 0;
751     error = 0;
752 
753     for (;;) {
754 	/*
755 	 * We have been asked to stop
756 	 */
757 	if (jo->flags & MC_JOURNAL_STOP_REQ)
758 		break;
759 
760 	/*
761 	 * If we have no active transaction id, get one from the return
762 	 * stream.
763 	 */
764 	if (transid == 0) {
765 	    error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
766 #if 0
767 	    printf("fp_read ack error %d count %d\n", error, count);
768 #endif
769 	    if (error || count != sizeof(ack))
770 		break;
771 	    if (error) {
772 		printf("read error %d on receive stream\n", error);
773 		break;
774 	    }
775 	    if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
776 		ack.rend.endmagic != JREC_ENDMAGIC
777 	    ) {
778 		printf("bad begmagic or endmagic on receive stream\n");
779 		break;
780 	    }
781 	    transid = ack.rbeg.transid;
782 	}
783 
784 	/*
785 	 * Calculate the number of unacknowledged bytes.  If there are no
786 	 * unacknowledged bytes then unsent data was acknowledged, report,
787 	 * sleep a bit, and loop in that case.  This should not happen
788 	 * normally.  The ack record is thrown away.
789 	 */
790 	bytes = jo->fifo.rindex - jo->fifo.xindex;
791 
792 	if (bytes == 0) {
793 	    printf("warning: unsent data acknowledged transid %08llx\n", transid);
794 	    tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
795 	    transid = 0;
796 	    continue;
797 	}
798 
799 	/*
800 	 * Since rindex has advanced, the record pointed to by xindex
801 	 * must be a valid record.
802 	 */
803 	rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
804 	KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
805 	KKASSERT(rawp->recsize <= bytes);
806 
807 	/*
808 	 * The target can acknowledge several records at once.
809 	 */
810 	if (rawp->transid < transid) {
811 #if 1
812 	    printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
813 #endif
814 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
815 	    jo->total_acked += (rawp->recsize + 15) & ~15;
816 	    if (jo->flags & MC_JOURNAL_WWAIT) {
817 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
818 		wakeup(&jo->fifo.windex);
819 	    }
820 	    continue;
821 	}
822 	if (rawp->transid == transid) {
823 #if 1
824 	    printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
825 #endif
826 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
827 	    jo->total_acked += (rawp->recsize + 15) & ~15;
828 	    if (jo->flags & MC_JOURNAL_WWAIT) {
829 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
830 		wakeup(&jo->fifo.windex);
831 	    }
832 	    transid = 0;
833 	    continue;
834 	}
835 	printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
836 	transid = 0;
837     }
838     jo->flags &= ~MC_JOURNAL_RACTIVE;
839     wakeup(jo);
840     wakeup(&jo->fifo.windex);
841 }
842 
843 /*
844  * This builds a pad record which the journaling thread will skip over.  Pad
845  * records are required when we are unable to reserve sufficient stream space
846  * due to insufficient space at the end of the physical memory fifo.
847  *
848  * Even though the record is not transmitted, a normal transid must be
849  * assigned to it so link recovery operations after a failure work properly.
850  */
851 static
852 void
853 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
854 {
855     struct journal_rawrecend *rendp;
856 
857     KKASSERT((recsize & 15) == 0 && recsize >= 16);
858 
859     rawp->streamid = JREC_STREAMID_PAD;
860     rawp->recsize = recsize;	/* must be 16-byte aligned */
861     rawp->transid = transid;
862     /*
863      * WARNING, rendp may overlap rawp->transid.  This is necessary to
864      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
865      * hopefully cause the compiler to not make any assumptions.
866      */
867     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
868     rendp->endmagic = JREC_ENDMAGIC;
869     rendp->check = 0;
870     rendp->recsize = rawp->recsize;
871 
872     /*
873      * Set the begin magic last.  This is what will allow the journal
874      * thread to write the record out.  Use a store fence to prevent
875      * compiler and cpu reordering of the writes.
876      */
877     cpu_sfence();
878     rawp->begmagic = JREC_BEGMAGIC;
879 }
880 
881 /*
882  * Wake up the worker thread if the FIFO is more then half full or if
883  * someone is waiting for space to be freed up.  Otherwise let the
884  * heartbeat deal with it.  Being able to avoid waking up the worker
885  * is the key to the journal's cpu performance.
886  */
887 static __inline
888 void
889 journal_commit_wakeup(struct journal *jo)
890 {
891     int avail;
892 
893     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
894     KKASSERT(avail >= 0);
895     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
896 	wakeup(&jo->fifo);
897 }
898 
899 /*
900  * Create a new BEGIN stream record with the specified streamid and the
901  * specified amount of payload space.  *rawpp will be set to point to the
902  * base of the new stream record and a pointer to the base of the payload
903  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
904  * making this call.  The raw record header will be partially initialized.
905  *
906  * A stream can be extended, aborted, or committed by other API calls
907  * below.  This may result in a sequence of potentially disconnected
908  * stream records to be output to the journaling target.  The first record
909  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
910  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
911  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
912  * up being the same as the first, in which case the bits are all set in
913  * the first record.
914  *
915  * The stream record is created in an incomplete state by setting the begin
916  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
917  * flushing the fifo past our record until we have finished populating it.
918  * Other threads can reserve and operate on their own space without stalling
919  * but the stream output will stall until we have completed operations.  The
920  * memory FIFO is intended to be large enough to absorb such situations
921  * without stalling out other threads.
922  */
923 static
924 void *
925 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
926 		int16_t streamid, int bytes)
927 {
928     struct journal_rawrecbeg *rawp;
929     int avail;
930     int availtoend;
931     int req;
932 
933     /*
934      * Add header and trailer overheads to the passed payload.  Note that
935      * the passed payload size need not be aligned in any way.
936      */
937     bytes += sizeof(struct journal_rawrecbeg);
938     bytes += sizeof(struct journal_rawrecend);
939 
940     for (;;) {
941 	/*
942 	 * First, check boundary conditions.  If the request would wrap around
943 	 * we have to skip past the ending block and return to the beginning
944 	 * of the FIFO's buffer.  Calculate 'req' which is the actual number
945 	 * of bytes being reserved, including wrap-around dead space.
946 	 *
947 	 * Neither 'bytes' or 'req' are aligned.
948 	 *
949 	 * Note that availtoend is not truncated to avail and so cannot be
950 	 * used to determine whether the reservation is possible by itself.
951 	 * Also, since all fifo ops are 16-byte aligned, we can check
952 	 * the size before calculating the aligned size.
953 	 */
954 	availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
955 	KKASSERT((availtoend & 15) == 0);
956 	if (bytes > availtoend)
957 	    req = bytes + availtoend;	/* add pad to end */
958 	else
959 	    req = bytes;
960 
961 	/*
962 	 * Next calculate the total available space and see if it is
963 	 * sufficient.  We cannot overwrite previously buffered data
964 	 * past xindex because otherwise we would not be able to restart
965 	 * a broken link at the target's last point of commit.
966 	 */
967 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
968 	KKASSERT(avail >= 0 && (avail & 15) == 0);
969 
970 	if (avail < req) {
971 	    /* XXX MC_JOURNAL_STOP_IMM */
972 	    jo->flags |= MC_JOURNAL_WWAIT;
973 	    ++jo->fifostalls;
974 	    tsleep(&jo->fifo.windex, 0, "jwrite", 0);
975 	    continue;
976 	}
977 
978 	/*
979 	 * Create a pad record for any dead space and create an incomplete
980 	 * record for the live space, then return a pointer to the
981 	 * contiguous buffer space that was requested.
982 	 *
983 	 * NOTE: The worker thread will not flush past an incomplete
984 	 * record, so the reserved space can be filled in at-will.  The
985 	 * journaling code must also be aware the reserved sections occuring
986 	 * after this one will also not be written out even if completed
987 	 * until this one is completed.
988 	 *
989 	 * The transaction id must accomodate real and potential pad creation.
990 	 */
991 	rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
992 	if (req != bytes) {
993 	    journal_build_pad(rawp, availtoend, jo->transid);
994 	    ++jo->transid;
995 	    rawp = (void *)jo->fifo.membase;
996 	}
997 	rawp->begmagic = JREC_INCOMPLETEMAGIC;	/* updated by abort/commit */
998 	rawp->recsize = bytes;			/* (unaligned size) */
999 	rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
1000 	rawp->transid = jo->transid;
1001 	jo->transid += 2;
1002 
1003 	/*
1004 	 * Issue a memory barrier to guarentee that the record data has been
1005 	 * properly initialized before we advance the write index and return
1006 	 * a pointer to the reserved record.  Otherwise the worker thread
1007 	 * could accidently run past us.
1008 	 *
1009 	 * Note that stream records are always 16-byte aligned.
1010 	 */
1011 	cpu_sfence();
1012 	jo->fifo.windex += (req + 15) & ~15;
1013 	*rawpp = rawp;
1014 	return(rawp + 1);
1015     }
1016     /* not reached */
1017     *rawpp = NULL;
1018     return(NULL);
1019 }
1020 
1021 /*
1022  * Attempt to extend the stream record by <bytes> worth of payload space.
1023  *
1024  * If it is possible to extend the existing stream record no truncation
1025  * occurs and the record is extended as specified.  A pointer to the
1026  * truncation offset within the payload space is returned.
1027  *
1028  * If it is not possible to do this the existing stream record is truncated
1029  * and committed, and a new stream record of size <bytes> is created.  A
1030  * pointer to the base of the new stream record's payload space is returned.
1031  *
1032  * *rawpp is set to the new reservation in the case of a new record but
1033  * the caller cannot depend on a comparison with the old rawp to determine if
1034  * this case occurs because we could end up using the same memory FIFO
1035  * offset for the new stream record.  Use *newstreamrecp instead.
1036  */
1037 static void *
1038 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp,
1039 		int truncbytes, int bytes, int *newstreamrecp)
1040 {
1041     struct journal_rawrecbeg *rawp;
1042     int16_t streamid;
1043     int availtoend;
1044     int avail;
1045     int osize;
1046     int nsize;
1047     int wbase;
1048     void *rptr;
1049 
1050     *newstreamrecp = 0;
1051     rawp = *rawpp;
1052     osize = (rawp->recsize + 15) & ~15;
1053     nsize = (rawp->recsize + bytes + 15) & ~15;
1054     wbase = (char *)rawp - jo->fifo.membase;
1055 
1056     /*
1057      * If the aligned record size does not change we can trivially adjust
1058      * the record size.
1059      */
1060     if (nsize == osize) {
1061 	rawp->recsize += bytes;
1062 	return((char *)(rawp + 1) + truncbytes);
1063     }
1064 
1065     /*
1066      * If the fifo's write index hasn't been modified since we made the
1067      * reservation and we do not hit any boundary conditions, we can
1068      * trivially make the record smaller or larger.
1069      */
1070     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
1071 	availtoend = jo->fifo.size - wbase;
1072 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
1073 	KKASSERT((availtoend & 15) == 0);
1074 	KKASSERT((avail & 15) == 0);
1075 	if (nsize <= avail && nsize <= availtoend) {
1076 	    jo->fifo.windex += nsize - osize;
1077 	    rawp->recsize += bytes;
1078 	    return((char *)(rawp + 1) + truncbytes);
1079 	}
1080     }
1081 
1082     /*
1083      * It was not possible to extend the buffer.  Commit the current
1084      * buffer and create a new one.  We manually clear the BEGIN mark that
1085      * journal_reserve() creates (because this is a continuing record, not
1086      * the start of a new stream).
1087      */
1088     streamid = rawp->streamid & JREC_STREAMID_MASK;
1089     journal_commit(jo, rawpp, truncbytes, 0);
1090     rptr = journal_reserve(jo, rawpp, streamid, bytes);
1091     rawp = *rawpp;
1092     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
1093     *newstreamrecp = 1;
1094     return(rptr);
1095 }
1096 
1097 /*
1098  * Abort a journal record.  If the transaction record represents a stream
1099  * BEGIN and we can reverse the fifo's write index we can simply reverse
1100  * index the entire record, as if it were never reserved in the first place.
1101  *
1102  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
1103  * with the payload truncated to 0 bytes.
1104  */
1105 static void
1106 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
1107 {
1108     struct journal_rawrecbeg *rawp;
1109     int osize;
1110 
1111     rawp = *rawpp;
1112     osize = (rawp->recsize + 15) & ~15;
1113 
1114     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
1115 	(jo->fifo.windex & jo->fifo.mask) ==
1116 	 (char *)rawp - jo->fifo.membase + osize)
1117     {
1118 	jo->fifo.windex -= osize;
1119 	*rawpp = NULL;
1120     } else {
1121 	rawp->streamid |= JREC_STREAMCTL_ABORTED;
1122 	journal_commit(jo, rawpp, 0, 1);
1123     }
1124 }
1125 
1126 /*
1127  * Commit a journal record and potentially truncate it to the specified
1128  * number of payload bytes.  If you do not want to truncate the record,
1129  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
1130  * field includes header and trailer and will not be correct.  Note that
1131  * passing 0 will truncate the entire data payload of the record.
1132  *
1133  * The logical stream is terminated by this function.
1134  *
1135  * If truncation occurs, and it is not possible to physically optimize the
1136  * memory FIFO due to other threads having reserved space after ours,
1137  * the remaining reserved space will be covered by a pad record.
1138  */
1139 static void
1140 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
1141 		int bytes, int closeout)
1142 {
1143     struct journal_rawrecbeg *rawp;
1144     struct journal_rawrecend *rendp;
1145     int osize;
1146     int nsize;
1147 
1148     rawp = *rawpp;
1149     *rawpp = NULL;
1150 
1151     KKASSERT((char *)rawp >= jo->fifo.membase &&
1152 	     (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
1153     KKASSERT(((intptr_t)rawp & 15) == 0);
1154 
1155     /*
1156      * Truncate the record if necessary.  If the FIFO write index as still
1157      * at the end of our record we can optimally backindex it.  Otherwise
1158      * we have to insert a pad record to cover the dead space.
1159      *
1160      * We calculate osize which is the 16-byte-aligned original recsize.
1161      * We calculate nsize which is the 16-byte-aligned new recsize.
1162      *
1163      * Due to alignment issues or in case the passed truncation bytes is
1164      * the same as the original payload, nsize may be equal to osize even
1165      * if the committed bytes is less then the originally reserved bytes.
1166      */
1167     if (bytes >= 0) {
1168 	KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
1169 	osize = (rawp->recsize + 15) & ~15;
1170 	rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
1171 			sizeof(struct journal_rawrecend);
1172 	nsize = (rawp->recsize + 15) & ~15;
1173 	KKASSERT(nsize <= osize);
1174 	if (osize == nsize) {
1175 	    /* do nothing */
1176 	} else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
1177 	    /* we are able to backindex the fifo */
1178 	    jo->fifo.windex -= osize - nsize;
1179 	} else {
1180 	    /* we cannot backindex the fifo, emplace a pad in the dead space */
1181 	    journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
1182 				rawp->transid + 1);
1183 	}
1184     }
1185 
1186     /*
1187      * Fill in the trailer.  Note that unlike pad records, the trailer will
1188      * never overlap the header.
1189      */
1190     rendp = (void *)((char *)rawp +
1191 	    ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
1192     rendp->endmagic = JREC_ENDMAGIC;
1193     rendp->recsize = rawp->recsize;
1194     rendp->check = 0;		/* XXX check word, disabled for now */
1195 
1196     /*
1197      * Fill in begmagic last.  This will allow the worker thread to proceed.
1198      * Use a memory barrier to guarentee write ordering.  Mark the stream
1199      * as terminated if closeout is set.  This is the typical case.
1200      */
1201     if (closeout)
1202 	rawp->streamid |= JREC_STREAMCTL_END;
1203     cpu_sfence();		/* memory and compiler barrier */
1204     rawp->begmagic = JREC_BEGMAGIC;
1205 
1206     journal_commit_wakeup(jo);
1207 }
1208 
1209 /************************************************************************
1210  *			PARALLEL TRANSACTION SUPPORT ROUTINES		*
1211  ************************************************************************
1212  *
1213  * JRECLIST_*() - routines which create and iterate over jrecord structures,
1214  *		  because a mount point may have multiple attached journals.
1215  */
1216 
1217 /*
1218  * Initialize the passed jrecord_list and create a jrecord for each
1219  * journal we need to write to.  Unnecessary mallocs are avoided by
1220  * using the passed jrecord structure as the first jrecord in the list.
1221  * A starting transaction is pushed for each jrecord.
1222  *
1223  * Returns non-zero if any of the journals require undo records.
1224  */
1225 static
1226 int
1227 jreclist_init(struct mount *mp, struct jrecord_list *jreclist,
1228 	      struct jrecord *jreccache, int16_t rectype)
1229 {
1230     struct journal *jo;
1231     struct jrecord *jrec;
1232     int wantrev = 0;
1233     int count = 0;
1234 
1235     TAILQ_INIT(jreclist);
1236     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1237 	if (count == 0)
1238 	    jrec = jreccache;
1239 	else
1240 	    jrec = malloc(sizeof(*jrec), M_JOURNAL, M_WAITOK);
1241 	jrecord_init(jo, jrec, -1);
1242 	jrec->user_save = jrecord_push(jrec, rectype);
1243 	TAILQ_INSERT_TAIL(jreclist, jrec, user_entry);
1244 	if (jo->flags & MC_JOURNAL_WANT_REVERSABLE)
1245 	    wantrev = 1;
1246 	++count;
1247     }
1248     return(wantrev);
1249 }
1250 
1251 /*
1252  * Terminate the journaled transactions started by jreclist_init().  If
1253  * an error occured, the transaction records will be aborted.
1254  */
1255 static
1256 void
1257 jreclist_done(struct jrecord_list *jreclist, int error)
1258 {
1259     struct jrecord *jrec;
1260     int count;
1261 
1262     TAILQ_FOREACH(jrec, jreclist, user_entry) {
1263 	jrecord_pop(jrec, jrec->user_save);
1264 	jrecord_done(jrec, error);
1265     }
1266     count = 0;
1267     while ((jrec = TAILQ_FIRST(jreclist)) != NULL) {
1268 	TAILQ_REMOVE(jreclist, jrec, user_entry);
1269 	if (count)
1270 	    free(jrec, M_JOURNAL);
1271 	++count;
1272     }
1273 }
1274 
1275 /*
1276  * This procedure writes out UNDO records for available reversable
1277  * journals.
1278  *
1279  * XXX could use improvement.  There is no need to re-read the file
1280  * for each journal.
1281  */
1282 static
1283 void
1284 jreclist_undo_file(struct jrecord_list *jreclist, struct vnode *vp,
1285 		   int jrflags, off_t off, off_t bytes)
1286 {
1287     struct jrecord *jrec;
1288     int error;
1289 
1290     error = 0;
1291     if (jrflags & JRUNDO_GETVP)
1292 	error = vget(vp, LK_SHARED);
1293     if (error == 0) {
1294 	TAILQ_FOREACH(jrec, jreclist, user_entry) {
1295 	    if (jrec->jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
1296 		jrecord_undo_file(jrec, vp, jrflags, off, bytes);
1297 	    }
1298 	}
1299     }
1300     if (error == 0 && jrflags & JRUNDO_GETVP)
1301 	vput(vp);
1302 }
1303 
1304 /************************************************************************
1305  *			TRANSACTION SUPPORT ROUTINES			*
1306  ************************************************************************
1307  *
1308  * JRECORD_*() - routines to create subrecord transactions and embed them
1309  *		 in the logical streams managed by the journal_*() routines.
1310  */
1311 
1312 static int16_t sid = JREC_STREAMID_JMIN;
1313 
1314 /*
1315  * Initialize the passed jrecord structure and start a new stream transaction
1316  * by reserving an initial build space in the journal's memory FIFO.
1317  */
1318 static void
1319 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
1320 {
1321     bzero(jrec, sizeof(*jrec));
1322     jrec->jo = jo;
1323     if (streamid < 0) {
1324 	streamid = sid++;	/* XXX need to track stream ids! */
1325 	if (sid == JREC_STREAMID_JMAX)
1326 	    sid = JREC_STREAMID_JMIN;
1327     }
1328     jrec->streamid = streamid;
1329     jrec->stream_residual = JREC_DEFAULTSIZE;
1330     jrec->stream_reserved = jrec->stream_residual;
1331     jrec->stream_ptr =
1332 	journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
1333 }
1334 
1335 /*
1336  * Push a recursive record type.  All pushes should have matching pops.
1337  * The old parent is returned and the newly pushed record becomes the
1338  * new parent.  Note that the old parent's pointer may already be invalid
1339  * or may become invalid if jrecord_write() had to build a new stream
1340  * record, so the caller should not mess with the returned pointer in
1341  * any way other then to save it.
1342  */
1343 static
1344 struct journal_subrecord *
1345 jrecord_push(struct jrecord *jrec, int16_t rectype)
1346 {
1347     struct journal_subrecord *save;
1348 
1349     save = jrec->parent;
1350     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
1351     jrec->last = NULL;
1352     KKASSERT(jrec->parent != NULL);
1353     ++jrec->pushcount;
1354     ++jrec->pushptrgood;	/* cleared on flush */
1355     return(save);
1356 }
1357 
1358 /*
1359  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
1360  * on the last record written within the subtransaction.  If the last
1361  * record written is not accessible or if the subtransaction is empty,
1362  * we must write out a pad record with JMASK_LAST set before popping.
1363  *
1364  * When popping a subtransaction the parent record's recsize field
1365  * will be properly set.  If the parent pointer is no longer valid
1366  * (which can occur if the data has already been flushed out to the
1367  * stream), the protocol spec allows us to leave it 0.
1368  *
1369  * The saved parent pointer which we restore may or may not be valid,
1370  * and if not valid may or may not be NULL, depending on the value
1371  * of pushptrgood.
1372  */
1373 static void
1374 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
1375 {
1376     struct journal_subrecord *last;
1377 
1378     KKASSERT(jrec->pushcount > 0);
1379     KKASSERT(jrec->residual == 0);
1380 
1381     /*
1382      * Set JMASK_LAST on the last record we wrote at the current
1383      * level.  If last is NULL we either no longer have access to the
1384      * record or the subtransaction was empty and we must write out a pad
1385      * record.
1386      */
1387     if ((last = jrec->last) == NULL) {
1388 	jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
1389 	last = jrec->last;	/* reload after possible flush */
1390     } else {
1391 	last->rectype |= JMASK_LAST;
1392     }
1393 
1394     /*
1395      * pushptrgood tells us how many levels of parent record pointers
1396      * are valid.  The jrec only stores the current parent record pointer
1397      * (and it is only valid if pushptrgood != 0).  The higher level parent
1398      * record pointers are saved by the routines calling jrecord_push() and
1399      * jrecord_pop().  These pointers may become stale and we determine
1400      * that fact by tracking the count of valid parent pointers with
1401      * pushptrgood.  Pointers become invalid when their related stream
1402      * record gets pushed out.
1403      *
1404      * If no pointer is available (the data has already been pushed out),
1405      * then no fixup of e.g. the length field is possible for non-leaf
1406      * nodes.  The protocol allows for this situation by placing a larger
1407      * burden on the program scanning the stream on the other end.
1408      *
1409      * [parentA]
1410      *	  [node X]
1411      *    [parentB]
1412      *	     [node Y]
1413      *	     [node Z]
1414      *    (pop B)	see NOTE B
1415      * (pop A)		see NOTE A
1416      *
1417      * NOTE B:	This pop sets LAST in node Z if the node is still accessible,
1418      *		else a PAD record is appended and LAST is set in that.
1419      *
1420      *		This pop sets the record size in parentB if parentB is still
1421      *		accessible, else the record size is left 0 (the scanner must
1422      *		deal with that).
1423      *
1424      *		This pop sets the new 'last' record to parentB, the pointer
1425      *		to which may or may not still be accessible.
1426      *
1427      * NOTE A:	This pop sets LAST in parentB if the node is still accessible,
1428      *		else a PAD record is appended and LAST is set in that.
1429      *
1430      *		This pop sets the record size in parentA if parentA is still
1431      *		accessible, else the record size is left 0 (the scanner must
1432      *		deal with that).
1433      *
1434      *		This pop sets the new 'last' record to parentA, the pointer
1435      *		to which may or may not still be accessible.
1436      *
1437      * Also note that the last record in the stream transaction, which in
1438      * the above example is parentA, does not currently have the LAST bit
1439      * set.
1440      *
1441      * The current parent becomes the last record relative to the
1442      * saved parent passed into us.  It's validity is based on
1443      * whether pushptrgood is non-zero prior to decrementing.  The saved
1444      * parent becomes the new parent, and its validity is based on whether
1445      * pushptrgood is non-zero after decrementing.
1446      *
1447      * The old jrec->parent may be NULL if it is no longer accessible.
1448      * If pushptrgood is non-zero, however, it is guarenteed to not
1449      * be NULL (since no flush occured).
1450      */
1451     jrec->last = jrec->parent;
1452     --jrec->pushcount;
1453     if (jrec->pushptrgood) {
1454 	KKASSERT(jrec->last != NULL && last != NULL);
1455 	if (--jrec->pushptrgood == 0) {
1456 	    jrec->parent = NULL;	/* 'save' contains garbage or NULL */
1457 	} else {
1458 	    KKASSERT(save != NULL);
1459 	    jrec->parent = save;	/* 'save' must not be NULL */
1460 	}
1461 
1462 	/*
1463 	 * Set the record size in the old parent.  'last' still points to
1464 	 * the original last record in the subtransaction being popped,
1465 	 * jrec->last points to the old parent (which became the last
1466 	 * record relative to the new parent being popped into).
1467 	 */
1468 	jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1469     } else {
1470 	jrec->parent = NULL;
1471 	KKASSERT(jrec->last == NULL);
1472     }
1473 }
1474 
1475 /*
1476  * Write out a leaf record, including associated data.
1477  */
1478 static
1479 void
1480 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
1481 {
1482     jrecord_write(jrec, rectype, bytes);
1483     jrecord_data(jrec, ptr, bytes);
1484 }
1485 
1486 /*
1487  * Write a leaf record out and return a pointer to its base.  The leaf
1488  * record may contain potentially megabytes of data which is supplied
1489  * in jrecord_data() calls.  The exact amount must be specified in this
1490  * call.
1491  *
1492  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
1493  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
1494  * USE THE RETURN VALUE.
1495  */
1496 static
1497 struct journal_subrecord *
1498 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1499 {
1500     struct journal_subrecord *last;
1501     int pusheditout;
1502 
1503     /*
1504      * Try to catch some obvious errors.  Nesting records must specify a
1505      * size of 0, and there should be no left-overs from previous operations
1506      * (such as incomplete data writeouts).
1507      */
1508     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1509     KKASSERT(jrec->residual == 0);
1510 
1511     /*
1512      * Check to see if the current stream record has enough room for
1513      * the new subrecord header.  If it doesn't we extend the current
1514      * stream record.
1515      *
1516      * This may have the side effect of pushing out the current stream record
1517      * and creating a new one.  We must adjust our stream tracking fields
1518      * accordingly.
1519      */
1520     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1521 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1522 				jrec->stream_reserved - jrec->stream_residual,
1523 				JREC_DEFAULTSIZE, &pusheditout);
1524 	if (pusheditout) {
1525 	    /*
1526 	     * If a pushout occured, the pushed out stream record was
1527 	     * truncated as specified and the new record is exactly the
1528 	     * extension size specified.
1529 	     */
1530 	    jrec->stream_reserved = JREC_DEFAULTSIZE;
1531 	    jrec->stream_residual = JREC_DEFAULTSIZE;
1532 	    jrec->parent = NULL;	/* no longer accessible */
1533 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1534 	} else {
1535 	    /*
1536 	     * If no pushout occured the stream record is NOT truncated and
1537 	     * IS extended.
1538 	     */
1539 	    jrec->stream_reserved += JREC_DEFAULTSIZE;
1540 	    jrec->stream_residual += JREC_DEFAULTSIZE;
1541 	}
1542     }
1543     last = (void *)jrec->stream_ptr;
1544     last->rectype = rectype;
1545     last->reserved = 0;
1546 
1547     /*
1548      * We may not know the record size for recursive records and the
1549      * header may become unavailable due to limited FIFO space.  Write
1550      * -1 to indicate this special case.
1551      */
1552     if ((rectype & JMASK_NESTED) && bytes == 0)
1553 	last->recsize = -1;
1554     else
1555 	last->recsize = sizeof(struct journal_subrecord) + bytes;
1556     jrec->last = last;
1557     jrec->residual = bytes;		/* remaining data to be posted */
1558     jrec->residual_align = -bytes & 7;	/* post-data alignment required */
1559     jrec->stream_ptr += sizeof(*last);	/* current write pointer */
1560     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1561     return(last);
1562 }
1563 
1564 /*
1565  * Write out the data associated with a leaf record.  Any number of calls
1566  * to this routine may be made as long as the byte count adds up to the
1567  * amount originally specified in jrecord_write().
1568  *
1569  * The act of writing out the leaf data may result in numerous stream records
1570  * being pushed out.   Callers should be aware that even the associated
1571  * subrecord header may become inaccessible due to stream record pushouts.
1572  */
1573 static void
1574 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1575 {
1576     int pusheditout;
1577     int extsize;
1578 
1579     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1580 
1581     /*
1582      * Push out stream records as long as there is insufficient room to hold
1583      * the remaining data.
1584      */
1585     while (jrec->stream_residual < bytes) {
1586 	/*
1587 	 * Fill in any remaining space in the current stream record.
1588 	 */
1589 	bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1590 	buf = (const char *)buf + jrec->stream_residual;
1591 	bytes -= jrec->stream_residual;
1592 	/*jrec->stream_ptr += jrec->stream_residual;*/
1593 	jrec->residual -= jrec->stream_residual;
1594 	jrec->stream_residual = 0;
1595 
1596 	/*
1597 	 * Try to extend the current stream record, but no more then 1/4
1598 	 * the size of the FIFO.
1599 	 */
1600 	extsize = jrec->jo->fifo.size >> 2;
1601 	if (extsize > bytes)
1602 	    extsize = (bytes + 15) & ~15;
1603 
1604 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1605 				jrec->stream_reserved - jrec->stream_residual,
1606 				extsize, &pusheditout);
1607 	if (pusheditout) {
1608 	    jrec->stream_reserved = extsize;
1609 	    jrec->stream_residual = extsize;
1610 	    jrec->parent = NULL;	/* no longer accessible */
1611 	    jrec->last = NULL;		/* no longer accessible */
1612 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1613 	} else {
1614 	    jrec->stream_reserved += extsize;
1615 	    jrec->stream_residual += extsize;
1616 	}
1617     }
1618 
1619     /*
1620      * Push out any remaining bytes into the current stream record.
1621      */
1622     if (bytes) {
1623 	bcopy(buf, jrec->stream_ptr, bytes);
1624 	jrec->stream_ptr += bytes;
1625 	jrec->stream_residual -= bytes;
1626 	jrec->residual -= bytes;
1627     }
1628 
1629     /*
1630      * Handle data alignment requirements for the subrecord.  Because the
1631      * stream record's data space is more strictly aligned, it must already
1632      * have sufficient space to hold any subrecord alignment slop.
1633      */
1634     if (jrec->residual == 0 && jrec->residual_align) {
1635 	KKASSERT(jrec->residual_align <= jrec->stream_residual);
1636 	bzero(jrec->stream_ptr, jrec->residual_align);
1637 	jrec->stream_ptr += jrec->residual_align;
1638 	jrec->stream_residual -= jrec->residual_align;
1639 	jrec->residual_align = 0;
1640     }
1641 }
1642 
1643 /*
1644  * We are finished with the transaction.  This closes the transaction created
1645  * by jrecord_init().
1646  *
1647  * NOTE: If abortit is not set then we must be at the top level with no
1648  *	 residual subrecord data left to output.
1649  *
1650  *	 If abortit is set then we can be in any state, all pushes will be
1651  *	 popped and it is ok for there to be residual data.  This works
1652  *	 because the virtual stream itself is truncated.  Scanners must deal
1653  *	 with this situation.
1654  *
1655  * The stream record will be committed or aborted as specified and jrecord
1656  * resources will be cleaned up.
1657  */
1658 static void
1659 jrecord_done(struct jrecord *jrec, int abortit)
1660 {
1661     KKASSERT(jrec->rawp != NULL);
1662 
1663     if (abortit) {
1664 	journal_abort(jrec->jo, &jrec->rawp);
1665     } else {
1666 	KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1667 	journal_commit(jrec->jo, &jrec->rawp,
1668 			jrec->stream_reserved - jrec->stream_residual, 1);
1669     }
1670 
1671     /*
1672      * jrec should not be used beyond this point without another init,
1673      * but clean up some fields to ensure that we panic if it is.
1674      *
1675      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1676      */
1677     jrec->jo = NULL;
1678     jrec->stream_ptr = NULL;
1679 }
1680 
1681 /************************************************************************
1682  *			LOW LEVEL RECORD SUPPORT ROUTINES		*
1683  ************************************************************************
1684  *
1685  * These routine create low level recursive and leaf subrecords representing
1686  * common filesystem structures.
1687  */
1688 
1689 /*
1690  * Write out a filename path relative to the base of the mount point.
1691  * rectype is typically JLEAF_PATH{1,2,3,4}.
1692  */
1693 static void
1694 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1695 {
1696     char buf[64];	/* local buffer if it fits, else malloced */
1697     char *base;
1698     int pathlen;
1699     int index;
1700     struct namecache *scan;
1701 
1702     /*
1703      * Pass 1 - figure out the number of bytes required.  Include terminating
1704      * 	       \0 on last element and '/' separator on other elements.
1705      */
1706 again:
1707     pathlen = 0;
1708     for (scan = ncp;
1709 	 scan && (scan->nc_flag & NCF_MOUNTPT) == 0;
1710 	 scan = scan->nc_parent
1711     ) {
1712 	pathlen += scan->nc_nlen + 1;
1713     }
1714 
1715     if (pathlen <= sizeof(buf))
1716 	base = buf;
1717     else
1718 	base = malloc(pathlen, M_TEMP, M_INTWAIT);
1719 
1720     /*
1721      * Pass 2 - generate the path buffer
1722      */
1723     index = pathlen;
1724     for (scan = ncp;
1725 	 scan && (scan->nc_flag & NCF_MOUNTPT) == 0;
1726 	 scan = scan->nc_parent
1727     ) {
1728 	if (scan->nc_nlen >= index) {
1729 	    if (base != buf)
1730 		free(base, M_TEMP);
1731 	    goto again;
1732 	}
1733 	if (index == pathlen)
1734 	    base[--index] = 0;
1735 	else
1736 	    base[--index] = '/';
1737 	index -= scan->nc_nlen;
1738 	bcopy(scan->nc_name, base + index, scan->nc_nlen);
1739     }
1740     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1741     if (base != buf)
1742 	free(base, M_TEMP);
1743 }
1744 
1745 /*
1746  * Write out a file attribute structure.  While somewhat inefficient, using
1747  * a recursive data structure is the most portable and extensible way.
1748  */
1749 static void
1750 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1751 {
1752     void *save;
1753 
1754     save = jrecord_push(jrec, JTYPE_VATTR);
1755     if (vat->va_type != VNON)
1756 	jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1757     if (vat->va_mode != (mode_t)VNOVAL)
1758 	jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1759     if (vat->va_nlink != VNOVAL)
1760 	jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1761     if (vat->va_uid != VNOVAL)
1762 	jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1763     if (vat->va_gid != VNOVAL)
1764 	jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1765     if (vat->va_fsid != VNOVAL)
1766 	jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1767     if (vat->va_fileid != VNOVAL)
1768 	jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1769     if (vat->va_size != VNOVAL)
1770 	jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1771     if (vat->va_atime.tv_sec != VNOVAL)
1772 	jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1773     if (vat->va_mtime.tv_sec != VNOVAL)
1774 	jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1775     if (vat->va_ctime.tv_sec != VNOVAL)
1776 	jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1777     if (vat->va_gen != VNOVAL)
1778 	jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1779     if (vat->va_flags != VNOVAL)
1780 	jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1781     if (vat->va_rdev != VNOVAL)
1782 	jrecord_leaf(jrec, JLEAF_UDEV, &vat->va_rdev, sizeof(vat->va_rdev));
1783 #if 0
1784     if (vat->va_filerev != VNOVAL)
1785 	jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1786 #endif
1787     jrecord_pop(jrec, save);
1788 }
1789 
1790 /*
1791  * Write out the creds used to issue a file operation.  If a process is
1792  * available write out additional tracking information related to the
1793  * process.
1794  *
1795  * XXX additional tracking info
1796  * XXX tty line info
1797  */
1798 static void
1799 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1800 {
1801     void *save;
1802     struct proc *p;
1803 
1804     save = jrecord_push(jrec, JTYPE_CRED);
1805     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1806     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1807     if (td && (p = td->td_proc) != NULL) {
1808 	jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1809 	jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1810     }
1811     jrecord_pop(jrec, save);
1812 }
1813 
1814 /*
1815  * Write out information required to identify a vnode
1816  *
1817  * XXX this needs work.  We should write out the inode number as well,
1818  * and in fact avoid writing out the file path for seqential writes
1819  * occuring within e.g. a certain period of time.
1820  */
1821 static void
1822 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1823 {
1824     struct namecache *ncp;
1825 
1826     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1827 	if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1828 	    break;
1829     }
1830     if (ncp)
1831 	jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1832 }
1833 
1834 static void
1835 jrecord_write_vnode_link(struct jrecord *jrec, struct vnode *vp,
1836 			 struct namecache *notncp)
1837 {
1838     struct namecache *ncp;
1839 
1840     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1841 	if (ncp == notncp)
1842 	    continue;
1843 	if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1844 	    break;
1845     }
1846     if (ncp)
1847 	jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1848 }
1849 
1850 #if 0
1851 /*
1852  * Write out the current contents of the file within the specified
1853  * range.  This is typically called from within an UNDO section.  A
1854  * locked vnode must be passed.
1855  */
1856 static int
1857 jrecord_write_filearea(struct jrecord *jrec, struct vnode *vp,
1858 			off_t begoff, off_t endoff)
1859 {
1860 }
1861 #endif
1862 
1863 /*
1864  * Write out the data represented by a pagelist
1865  */
1866 static void
1867 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1868 			struct vm_page **pglist, int *rtvals, int pgcount,
1869 			off_t offset)
1870 {
1871     struct msf_buf *msf;
1872     int error;
1873     int b;
1874     int i;
1875 
1876     i = 0;
1877     while (i < pgcount) {
1878 	/*
1879 	 * Find the next valid section.  Skip any invalid elements
1880 	 */
1881 	if (rtvals[i] != VM_PAGER_OK) {
1882 	    ++i;
1883 	    offset += PAGE_SIZE;
1884 	    continue;
1885 	}
1886 
1887 	/*
1888 	 * Figure out how big the valid section is, capping I/O at what the
1889 	 * MSFBUF can represent.
1890 	 */
1891 	b = i;
1892 	while (i < pgcount && i - b != XIO_INTERNAL_PAGES &&
1893 	       rtvals[i] == VM_PAGER_OK
1894 	) {
1895 	    ++i;
1896 	}
1897 
1898 	/*
1899 	 * And write it out.
1900 	 */
1901 	if (i - b) {
1902 	    error = msf_map_pagelist(&msf, pglist + b, i - b, 0);
1903 	    if (error == 0) {
1904 		printf("RECORD PUTPAGES %d\n", msf_buf_bytes(msf));
1905 		jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1906 		jrecord_leaf(jrec, rectype,
1907 			     msf_buf_kva(msf), msf_buf_bytes(msf));
1908 		msf_buf_free(msf);
1909 	    } else {
1910 		printf("jrecord_write_pagelist: mapping failure\n");
1911 	    }
1912 	    offset += (off_t)(i - b) << PAGE_SHIFT;
1913 	}
1914     }
1915 }
1916 
1917 /*
1918  * Write out the data represented by a UIO.
1919  */
1920 struct jwuio_info {
1921     struct jrecord *jrec;
1922     int16_t rectype;
1923 };
1924 
1925 static int jrecord_write_uio_callback(void *info, char *buf, int bytes);
1926 
1927 static void
1928 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1929 {
1930     struct jwuio_info info = { jrec, rectype };
1931     int error;
1932 
1933     if (uio->uio_segflg != UIO_NOCOPY) {
1934 	jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset,
1935 		     sizeof(uio->uio_offset));
1936 	error = msf_uio_iterate(uio, jrecord_write_uio_callback, &info);
1937 	if (error)
1938 	    printf("XXX warning uio iterate failed %d\n", error);
1939     }
1940 }
1941 
1942 static int
1943 jrecord_write_uio_callback(void *info_arg, char *buf, int bytes)
1944 {
1945     struct jwuio_info *info = info_arg;
1946 
1947     jrecord_leaf(info->jrec, info->rectype, buf, bytes);
1948     return(0);
1949 }
1950 
1951 static void
1952 jrecord_file_data(struct jrecord *jrec, struct vnode *vp,
1953 		  off_t off, off_t bytes)
1954 {
1955     const int bufsize = 8192;
1956     char *buf;
1957     int error;
1958     int n;
1959 
1960     buf = malloc(bufsize, M_JOURNAL, M_WAITOK);
1961     jrecord_leaf(jrec, JLEAF_SEEKPOS, &off, sizeof(off));
1962     while (bytes) {
1963 	n = (bytes > bufsize) ? bufsize : (int)bytes;
1964 	error = vn_rdwr(UIO_READ, vp, buf, n, off, UIO_SYSSPACE, IO_NODELOCKED,
1965 			proc0.p_ucred, NULL);
1966 	if (error) {
1967 	    jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
1968 	    break;
1969 	}
1970 	jrecord_leaf(jrec, JLEAF_FILEDATA, buf, n);
1971 	bytes -= n;
1972 	off += n;
1973     }
1974     free(buf, M_JOURNAL);
1975 }
1976 
1977 /************************************************************************
1978  *			LOW LEVEL UNDO SUPPORT ROUTINE			*
1979  ************************************************************************
1980  *
1981  * This function is used to support UNDO records.  It will generate an
1982  * appropriate record with the requested portion of the file data.  Note
1983  * that file data is only recorded if JRUNDO_FILEDATA is passed.  If bytes
1984  * is -1, it will be set to the size of the file.
1985  */
1986 static void
1987 jrecord_undo_file(struct jrecord *jrec, struct vnode *vp, int jrflags,
1988 		  off_t off, off_t bytes)
1989 {
1990     struct vattr attr;
1991     void *save1; /* warning, save pointers do not always remain valid */
1992     void *save2;
1993     int error;
1994 
1995     /*
1996      * Setup.  Start the UNDO record, obtain a shared lock on the vnode,
1997      * and retrieve attribute info.
1998      */
1999     save1 = jrecord_push(jrec, JTYPE_UNDO);
2000     error = VOP_GETATTR(vp, &attr);
2001     if (error)
2002 	goto done;
2003 
2004     /*
2005      * Generate UNDO records as requested.
2006      */
2007     if (jrflags & JRUNDO_VATTR) {
2008 	save2 = jrecord_push(jrec, JTYPE_VATTR);
2009 	jrecord_leaf(jrec, JLEAF_VTYPE, &attr.va_type, sizeof(attr.va_type));
2010 	if ((jrflags & JRUNDO_NLINK) && attr.va_nlink != VNOVAL)
2011 	    jrecord_leaf(jrec, JLEAF_NLINK, &attr.va_nlink, sizeof(attr.va_nlink));
2012 	if ((jrflags & JRUNDO_SIZE) && attr.va_size != VNOVAL)
2013 	    jrecord_leaf(jrec, JLEAF_SIZE, &attr.va_size, sizeof(attr.va_size));
2014 	if ((jrflags & JRUNDO_UID) && attr.va_uid != VNOVAL)
2015 	    jrecord_leaf(jrec, JLEAF_UID, &attr.va_uid, sizeof(attr.va_uid));
2016 	if ((jrflags & JRUNDO_GID) && attr.va_gid != VNOVAL)
2017 	    jrecord_leaf(jrec, JLEAF_GID, &attr.va_gid, sizeof(attr.va_gid));
2018 	if ((jrflags & JRUNDO_FSID) && attr.va_fsid != VNOVAL)
2019 	    jrecord_leaf(jrec, JLEAF_FSID, &attr.va_fsid, sizeof(attr.va_fsid));
2020 	if ((jrflags & JRUNDO_MODES) && attr.va_mode != (mode_t)VNOVAL)
2021 	    jrecord_leaf(jrec, JLEAF_MODES, &attr.va_mode, sizeof(attr.va_mode));
2022 	if ((jrflags & JRUNDO_INUM) && attr.va_fileid != VNOVAL)
2023 	    jrecord_leaf(jrec, JLEAF_INUM, &attr.va_fileid, sizeof(attr.va_fileid));
2024 	if ((jrflags & JRUNDO_ATIME) && attr.va_atime.tv_sec != VNOVAL)
2025 	    jrecord_leaf(jrec, JLEAF_ATIME, &attr.va_atime, sizeof(attr.va_atime));
2026 	if ((jrflags & JRUNDO_MTIME) && attr.va_mtime.tv_sec != VNOVAL)
2027 	    jrecord_leaf(jrec, JLEAF_MTIME, &attr.va_mtime, sizeof(attr.va_mtime));
2028 	if ((jrflags & JRUNDO_CTIME) && attr.va_ctime.tv_sec != VNOVAL)
2029 	    jrecord_leaf(jrec, JLEAF_CTIME, &attr.va_ctime, sizeof(attr.va_ctime));
2030 	if ((jrflags & JRUNDO_GEN) && attr.va_gen != VNOVAL)
2031 	    jrecord_leaf(jrec, JLEAF_GEN, &attr.va_gen, sizeof(attr.va_gen));
2032 	if ((jrflags & JRUNDO_FLAGS) && attr.va_flags != VNOVAL)
2033 	    jrecord_leaf(jrec, JLEAF_FLAGS, &attr.va_flags, sizeof(attr.va_flags));
2034 	if ((jrflags & JRUNDO_UDEV) && attr.va_rdev != VNOVAL)
2035 	    jrecord_leaf(jrec, JLEAF_UDEV, &attr.va_rdev, sizeof(attr.va_rdev));
2036 	jrecord_pop(jrec, save2);
2037     }
2038 
2039     /*
2040      * Output the file data being overwritten by reading the file and
2041      * writing it out to the journal prior to the write operation.  We
2042      * do not need to write out data past the current file EOF.
2043      *
2044      * XXX support JRUNDO_CONDLINK - do not write out file data for files
2045      * with a link count > 1.  The undo code needs to locate the inode and
2046      * regenerate the hardlink.
2047      */
2048     if ((jrflags & JRUNDO_FILEDATA) && attr.va_type == VREG) {
2049 	if (attr.va_size != VNOVAL) {
2050 	    if (bytes == -1)
2051 		bytes = attr.va_size - off;
2052 	    if (off + bytes > attr.va_size)
2053 		bytes = attr.va_size - off;
2054 	    if (bytes > 0)
2055 		jrecord_file_data(jrec, vp, off, bytes);
2056 	} else {
2057 	    error = EINVAL;
2058 	}
2059     }
2060     if ((jrflags & JRUNDO_FILEDATA) && attr.va_type == VLNK) {
2061 	struct iovec aiov;
2062 	struct uio auio;
2063 	char *buf;
2064 
2065 	buf = malloc(PATH_MAX, M_JOURNAL, M_WAITOK);
2066 	aiov.iov_base = buf;
2067 	aiov.iov_len = PATH_MAX;
2068 	auio.uio_iov = &aiov;
2069 	auio.uio_iovcnt = 1;
2070 	auio.uio_offset = 0;
2071 	auio.uio_rw = UIO_READ;
2072 	auio.uio_segflg = UIO_SYSSPACE;
2073 	auio.uio_td = curthread;
2074 	auio.uio_resid = PATH_MAX;
2075 	error = VOP_READLINK(vp, &auio, proc0.p_ucred);
2076 	if (error == 0) {
2077 		jrecord_leaf(jrec, JLEAF_SYMLINKDATA, buf,
2078 				PATH_MAX - auio.uio_resid);
2079 	}
2080 	free(buf, M_JOURNAL);
2081     }
2082 done:
2083     if (error)
2084 	jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
2085     jrecord_pop(jrec, save1);
2086 }
2087 
2088 /************************************************************************
2089  *			JOURNAL VNOPS					*
2090  ************************************************************************
2091  *
2092  * These are function shims replacing the normal filesystem ops.  We become
2093  * responsible for calling the underlying filesystem ops.  We have the choice
2094  * of executing the underlying op first and then generating the journal entry,
2095  * or starting the journal entry, executing the underlying op, and then
2096  * either completing or aborting it.
2097  *
2098  * The journal is supposed to be a high-level entity, which generally means
2099  * identifying files by name rather then by inode.  Supplying both allows
2100  * the journal to be used both for inode-number-compatible 'mirrors' and
2101  * for simple filesystem replication.
2102  *
2103  * Writes are particularly difficult to deal with because a single write may
2104  * represent a hundred megabyte buffer or more, and both writes and truncations
2105  * require the 'old' data to be written out as well as the new data if the
2106  * log is reversable.  Other issues:
2107  *
2108  * - How to deal with operations on unlinked files (no path available),
2109  *   but which may still be filesystem visible due to hard links.
2110  *
2111  * - How to deal with modifications made via a memory map.
2112  *
2113  * - Future cache coherency support will require cache coherency API calls
2114  *   both prior to and after the call to the underlying VFS.
2115  *
2116  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
2117  * new VFS equivalents (NMKDIR).
2118  */
2119 
2120 /*
2121  * Journal vop_settattr { a_vp, a_vap, a_cred, a_td }
2122  */
2123 static
2124 int
2125 journal_setattr(struct vop_setattr_args *ap)
2126 {
2127     struct jrecord_list jreclist;
2128     struct jrecord jreccache;
2129     struct jrecord *jrec;
2130     struct mount *mp;
2131     void *save;
2132     int error;
2133 
2134     mp = ap->a_head.a_ops->vv_mount;
2135     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETATTR)) {
2136 	jreclist_undo_file(&jreclist, ap->a_vp, JRUNDO_VATTR, 0, 0);
2137     }
2138     error = vop_journal_operate_ap(&ap->a_head);
2139     if (error == 0) {
2140 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2141 	    jrecord_write_cred(jrec, curthread, ap->a_cred);
2142 	    jrecord_write_vnode_ref(jrec, ap->a_vp);
2143 	    save = jrecord_push(jrec, JTYPE_REDO);
2144 	    jrecord_write_vattr(jrec, ap->a_vap);
2145 	    jrecord_pop(jrec, save);
2146 	}
2147     }
2148     jreclist_done(&jreclist, error);
2149     return (error);
2150 }
2151 
2152 /*
2153  * Journal vop_write { a_vp, a_uio, a_ioflag, a_cred }
2154  */
2155 static
2156 int
2157 journal_write(struct vop_write_args *ap)
2158 {
2159     struct jrecord_list jreclist;
2160     struct jrecord jreccache;
2161     struct jrecord *jrec;
2162     struct mount *mp;
2163     struct uio uio_copy;
2164     struct iovec uio_one_iovec;
2165     void *save;
2166     int error;
2167 
2168     /*
2169      * This is really nasty.  UIO's don't retain sufficient information to
2170      * be reusable once they've gone through the VOP chain.  The iovecs get
2171      * cleared, so we have to copy the UIO.
2172      *
2173      * XXX fix the UIO code to not destroy iov's during a scan so we can
2174      *     reuse the uio over and over again.
2175      *
2176      * XXX UNDO code needs to journal the old data prior to the write.
2177      */
2178     uio_copy = *ap->a_uio;
2179     if (uio_copy.uio_iovcnt == 1) {
2180 	uio_one_iovec = ap->a_uio->uio_iov[0];
2181 	uio_copy.uio_iov = &uio_one_iovec;
2182     } else {
2183 	uio_copy.uio_iov = malloc(uio_copy.uio_iovcnt * sizeof(struct iovec),
2184 				    M_JOURNAL, M_WAITOK);
2185 	bcopy(ap->a_uio->uio_iov, uio_copy.uio_iov,
2186 		uio_copy.uio_iovcnt * sizeof(struct iovec));
2187     }
2188 
2189     /*
2190      * Write out undo data.  Note that uio_offset is incorrect if
2191      * IO_APPEND is set, but fortunately we have no undo file data to
2192      * write out in that case.
2193      */
2194     mp = ap->a_head.a_ops->vv_mount;
2195     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_WRITE)) {
2196 	if (ap->a_ioflag & IO_APPEND) {
2197 	    jreclist_undo_file(&jreclist, ap->a_vp, JRUNDO_SIZE|JRUNDO_MTIME, 0, 0);
2198 	} else {
2199 	    jreclist_undo_file(&jreclist, ap->a_vp,
2200 			       JRUNDO_FILEDATA|JRUNDO_SIZE|JRUNDO_MTIME,
2201 			       uio_copy.uio_offset, uio_copy.uio_resid);
2202 	}
2203     }
2204     error = vop_journal_operate_ap(&ap->a_head);
2205 
2206     /*
2207      * XXX bad hack to figure out the offset for O_APPEND writes (note:
2208      * uio field state after the VFS operation).
2209      */
2210     uio_copy.uio_offset = ap->a_uio->uio_offset -
2211 			  (uio_copy.uio_resid - ap->a_uio->uio_resid);
2212 
2213     /*
2214      * Output the write data to the journal.
2215      */
2216     if (error == 0) {
2217 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2218 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2219 	    jrecord_write_vnode_ref(jrec, ap->a_vp);
2220 	    save = jrecord_push(jrec, JTYPE_REDO);
2221 	    jrecord_write_uio(jrec, JLEAF_FILEDATA, &uio_copy);
2222 	    jrecord_pop(jrec, save);
2223 	}
2224     }
2225     jreclist_done(&jreclist, error);
2226 
2227     if (uio_copy.uio_iov != &uio_one_iovec)
2228 	free(uio_copy.uio_iov, M_JOURNAL);
2229     return (error);
2230 }
2231 
2232 /*
2233  * Journal vop_fsync { a_vp, a_waitfor, a_td }
2234  */
2235 static
2236 int
2237 journal_fsync(struct vop_fsync_args *ap)
2238 {
2239 #if 0
2240     struct mount *mp;
2241     struct journal *jo;
2242 #endif
2243     int error;
2244 
2245     error = vop_journal_operate_ap(&ap->a_head);
2246 #if 0
2247     mp = ap->a_head.a_ops->vv_mount;
2248     if (error == 0) {
2249 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2250 	    /* XXX synchronize pending journal records */
2251 	}
2252     }
2253 #endif
2254     return (error);
2255 }
2256 
2257 /*
2258  * Journal vop_putpages { a_vp, a_m, a_count, a_sync, a_rtvals, a_offset }
2259  *
2260  * note: a_count is in bytes.
2261  */
2262 static
2263 int
2264 journal_putpages(struct vop_putpages_args *ap)
2265 {
2266     struct jrecord_list jreclist;
2267     struct jrecord jreccache;
2268     struct jrecord *jrec;
2269     struct mount *mp;
2270     void *save;
2271     int error;
2272 
2273     mp = ap->a_head.a_ops->vv_mount;
2274     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_PUTPAGES) &&
2275 	ap->a_count > 0
2276     ) {
2277 	jreclist_undo_file(&jreclist, ap->a_vp,
2278 			   JRUNDO_FILEDATA|JRUNDO_SIZE|JRUNDO_MTIME,
2279 			   ap->a_offset, btoc(ap->a_count));
2280     }
2281     error = vop_journal_operate_ap(&ap->a_head);
2282     if (error == 0 && ap->a_count > 0) {
2283 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2284 	    jrecord_write_vnode_ref(jrec, ap->a_vp);
2285 	    save = jrecord_push(jrec, JTYPE_REDO);
2286 	    jrecord_write_pagelist(jrec, JLEAF_FILEDATA, ap->a_m, ap->a_rtvals,
2287 				   btoc(ap->a_count), ap->a_offset);
2288 	    jrecord_pop(jrec, save);
2289 	}
2290     }
2291     jreclist_done(&jreclist, error);
2292     return (error);
2293 }
2294 
2295 /*
2296  * Journal vop_setacl { a_vp, a_type, a_aclp, a_cred, a_td }
2297  */
2298 static
2299 int
2300 journal_setacl(struct vop_setacl_args *ap)
2301 {
2302     struct jrecord_list jreclist;
2303     struct jrecord jreccache;
2304     struct jrecord *jrec;
2305     struct mount *mp;
2306     int error;
2307 
2308     mp = ap->a_head.a_ops->vv_mount;
2309     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETACL);
2310     error = vop_journal_operate_ap(&ap->a_head);
2311     if (error == 0) {
2312 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2313 #if 0
2314 	    if ((jo->flags & MC_JOURNAL_WANT_REVERSABLE))
2315 		jrecord_undo_file(jrec, ap->a_vp, JRUNDO_XXX, 0, 0);
2316 #endif
2317 	    jrecord_write_cred(jrec, curthread, ap->a_cred);
2318 	    jrecord_write_vnode_ref(jrec, ap->a_vp);
2319 #if 0
2320 	    save = jrecord_push(jrec, JTYPE_REDO);
2321 	    /* XXX type, aclp */
2322 	    jrecord_pop(jrec, save);
2323 #endif
2324 	}
2325     }
2326     jreclist_done(&jreclist, error);
2327     return (error);
2328 }
2329 
2330 /*
2331  * Journal vop_setextattr { a_vp, a_name, a_uio, a_cred, a_td }
2332  */
2333 static
2334 int
2335 journal_setextattr(struct vop_setextattr_args *ap)
2336 {
2337     struct jrecord_list jreclist;
2338     struct jrecord jreccache;
2339     struct jrecord *jrec;
2340     struct mount *mp;
2341     void *save;
2342     int error;
2343 
2344     mp = ap->a_head.a_ops->vv_mount;
2345     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETEXTATTR);
2346     error = vop_journal_operate_ap(&ap->a_head);
2347     if (error == 0) {
2348 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2349 #if 0
2350 	    if ((jo->flags & MC_JOURNAL_WANT_REVERSABLE))
2351 		jrecord_undo_file(jrec, ap->a_vp, JRUNDO_XXX, 0, 0);
2352 #endif
2353 	    jrecord_write_cred(jrec, curthread, ap->a_cred);
2354 	    jrecord_write_vnode_ref(jrec, ap->a_vp);
2355 	    jrecord_leaf(jrec, JLEAF_ATTRNAME, ap->a_name, strlen(ap->a_name));
2356 	    save = jrecord_push(jrec, JTYPE_REDO);
2357 	    jrecord_write_uio(jrec, JLEAF_FILEDATA, ap->a_uio);
2358 	    jrecord_pop(jrec, save);
2359 	}
2360     }
2361     jreclist_done(&jreclist, error);
2362     return (error);
2363 }
2364 
2365 /*
2366  * Journal vop_ncreate { a_ncp, a_vpp, a_cred, a_vap }
2367  */
2368 static
2369 int
2370 journal_ncreate(struct vop_ncreate_args *ap)
2371 {
2372     struct jrecord_list jreclist;
2373     struct jrecord jreccache;
2374     struct jrecord *jrec;
2375     struct mount *mp;
2376     void *save;
2377     int error;
2378 
2379     mp = ap->a_head.a_ops->vv_mount;
2380     jreclist_init(mp, &jreclist, &jreccache, JTYPE_CREATE);
2381     error = vop_journal_operate_ap(&ap->a_head);
2382     if (error == 0) {
2383 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2384 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2385 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2386 	    if (*ap->a_vpp)
2387 		jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2388 	    save = jrecord_push(jrec, JTYPE_REDO);
2389 	    jrecord_write_vattr(jrec, ap->a_vap);
2390 	    jrecord_pop(jrec, save);
2391 	}
2392     }
2393     jreclist_done(&jreclist, error);
2394     return (error);
2395 }
2396 
2397 /*
2398  * Journal vop_nmknod { a_ncp, a_vpp, a_cred, a_vap }
2399  */
2400 static
2401 int
2402 journal_nmknod(struct vop_nmknod_args *ap)
2403 {
2404     struct jrecord_list jreclist;
2405     struct jrecord jreccache;
2406     struct jrecord *jrec;
2407     struct mount *mp;
2408     void *save;
2409     int error;
2410 
2411     mp = ap->a_head.a_ops->vv_mount;
2412     jreclist_init(mp, &jreclist, &jreccache, JTYPE_MKNOD);
2413     error = vop_journal_operate_ap(&ap->a_head);
2414     if (error == 0) {
2415 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2416 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2417 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2418 	    save = jrecord_push(jrec, JTYPE_REDO);
2419 	    jrecord_write_vattr(jrec, ap->a_vap);
2420 	    jrecord_pop(jrec, save);
2421 	    if (*ap->a_vpp)
2422 		jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2423 	}
2424     }
2425     jreclist_done(&jreclist, error);
2426     return (error);
2427 }
2428 
2429 /*
2430  * Journal vop_nlink { a_ncp, a_vp, a_cred }
2431  */
2432 static
2433 int
2434 journal_nlink(struct vop_nlink_args *ap)
2435 {
2436     struct jrecord_list jreclist;
2437     struct jrecord jreccache;
2438     struct jrecord *jrec;
2439     struct mount *mp;
2440     void *save;
2441     int error;
2442 
2443     mp = ap->a_head.a_ops->vv_mount;
2444     jreclist_init(mp, &jreclist, &jreccache, JTYPE_LINK);
2445     error = vop_journal_operate_ap(&ap->a_head);
2446     if (error == 0) {
2447 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2448 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2449 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2450 	    /* XXX PATH to VP and inode number */
2451 	    /* XXX this call may not record the correct path when
2452 	     * multiple paths are available */
2453 	    save = jrecord_push(jrec, JTYPE_REDO);
2454 	    jrecord_write_vnode_link(jrec, ap->a_vp, ap->a_ncp);
2455 	    jrecord_pop(jrec, save);
2456 	}
2457     }
2458     jreclist_done(&jreclist, error);
2459     return (error);
2460 }
2461 
2462 /*
2463  * Journal vop_symlink { a_ncp, a_vpp, a_cred, a_vap, a_target }
2464  */
2465 static
2466 int
2467 journal_nsymlink(struct vop_nsymlink_args *ap)
2468 {
2469     struct jrecord_list jreclist;
2470     struct jrecord jreccache;
2471     struct jrecord *jrec;
2472     struct mount *mp;
2473     void *save;
2474     int error;
2475 
2476     mp = ap->a_head.a_ops->vv_mount;
2477     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SYMLINK);
2478     error = vop_journal_operate_ap(&ap->a_head);
2479     if (error == 0) {
2480 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2481 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2482 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2483 	    save = jrecord_push(jrec, JTYPE_REDO);
2484 	    jrecord_leaf(jrec, JLEAF_SYMLINKDATA,
2485 			ap->a_target, strlen(ap->a_target));
2486 	    jrecord_pop(jrec, save);
2487 	    if (*ap->a_vpp)
2488 		jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2489 	}
2490     }
2491     jreclist_done(&jreclist, error);
2492     return (error);
2493 }
2494 
2495 /*
2496  * Journal vop_nwhiteout { a_ncp, a_cred, a_flags }
2497  */
2498 static
2499 int
2500 journal_nwhiteout(struct vop_nwhiteout_args *ap)
2501 {
2502     struct jrecord_list jreclist;
2503     struct jrecord jreccache;
2504     struct jrecord *jrec;
2505     struct mount *mp;
2506     int error;
2507 
2508     mp = ap->a_head.a_ops->vv_mount;
2509     jreclist_init(mp, &jreclist, &jreccache, JTYPE_WHITEOUT);
2510     error = vop_journal_operate_ap(&ap->a_head);
2511     if (error == 0) {
2512 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2513 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2514 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2515 	}
2516     }
2517     jreclist_done(&jreclist, error);
2518     return (error);
2519 }
2520 
2521 /*
2522  * Journal vop_nremove { a_ncp, a_cred }
2523  */
2524 static
2525 int
2526 journal_nremove(struct vop_nremove_args *ap)
2527 {
2528     struct jrecord_list jreclist;
2529     struct jrecord jreccache;
2530     struct jrecord *jrec;
2531     struct mount *mp;
2532     int error;
2533 
2534     mp = ap->a_head.a_ops->vv_mount;
2535     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_REMOVE) &&
2536 	ap->a_ncp->nc_vp
2537     ) {
2538 	jreclist_undo_file(&jreclist, ap->a_ncp->nc_vp,
2539 			   JRUNDO_ALL|JRUNDO_GETVP|JRUNDO_CONDLINK, 0, -1);
2540     }
2541     error = vop_journal_operate_ap(&ap->a_head);
2542     if (error == 0) {
2543 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2544 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2545 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2546 	}
2547     }
2548     jreclist_done(&jreclist, error);
2549     return (error);
2550 }
2551 
2552 /*
2553  * Journal vop_nmkdir { a_ncp, a_vpp, a_cred, a_vap }
2554  */
2555 static
2556 int
2557 journal_nmkdir(struct vop_nmkdir_args *ap)
2558 {
2559     struct jrecord_list jreclist;
2560     struct jrecord jreccache;
2561     struct jrecord *jrec;
2562     struct mount *mp;
2563     int error;
2564 
2565     mp = ap->a_head.a_ops->vv_mount;
2566     jreclist_init(mp, &jreclist, &jreccache, JTYPE_MKDIR);
2567     error = vop_journal_operate_ap(&ap->a_head);
2568     if (error == 0) {
2569 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2570 #if 0
2571 	    if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
2572 		jrecord_write_audit(jrec);
2573 	    }
2574 #endif
2575 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2576 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2577 	    jrecord_write_vattr(jrec, ap->a_vap);
2578 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2579 	    if (*ap->a_vpp)
2580 		jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2581 	}
2582     }
2583     jreclist_done(&jreclist, error);
2584     return (error);
2585 }
2586 
2587 /*
2588  * Journal vop_nrmdir { a_ncp, a_cred }
2589  */
2590 static
2591 int
2592 journal_nrmdir(struct vop_nrmdir_args *ap)
2593 {
2594     struct jrecord_list jreclist;
2595     struct jrecord jreccache;
2596     struct jrecord *jrec;
2597     struct mount *mp;
2598     int error;
2599 
2600     mp = ap->a_head.a_ops->vv_mount;
2601     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_RMDIR)) {
2602 	jreclist_undo_file(&jreclist, ap->a_ncp->nc_vp,
2603 			   JRUNDO_VATTR|JRUNDO_GETVP, 0, 0);
2604     }
2605     error = vop_journal_operate_ap(&ap->a_head);
2606     if (error == 0) {
2607 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2608 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2609 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2610 	}
2611     }
2612     jreclist_done(&jreclist, error);
2613     return (error);
2614 }
2615 
2616 /*
2617  * Journal vop_nrename { a_fncp, a_tncp, a_cred }
2618  */
2619 static
2620 int
2621 journal_nrename(struct vop_nrename_args *ap)
2622 {
2623     struct jrecord_list jreclist;
2624     struct jrecord jreccache;
2625     struct jrecord *jrec;
2626     struct mount *mp;
2627     int error;
2628 
2629     mp = ap->a_head.a_ops->vv_mount;
2630     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_RENAME) &&
2631 	ap->a_tncp->nc_vp
2632     ) {
2633 	jreclist_undo_file(&jreclist, ap->a_tncp->nc_vp,
2634 			   JRUNDO_ALL|JRUNDO_GETVP|JRUNDO_CONDLINK, 0, -1);
2635     }
2636     error = vop_journal_operate_ap(&ap->a_head);
2637     if (error == 0) {
2638 	TAILQ_FOREACH(jrec, &jreclist, user_entry) {
2639 	    jrecord_write_cred(jrec, NULL, ap->a_cred);
2640 	    jrecord_write_path(jrec, JLEAF_PATH1, ap->a_fncp);
2641 	    jrecord_write_path(jrec, JLEAF_PATH2, ap->a_tncp);
2642 	}
2643     }
2644     jreclist_done(&jreclist, error);
2645     return (error);
2646 }
2647 
2648