xref: /dragonfly/sys/kern/vfs_journal.c (revision 029a4939)
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.6 2005/01/09 03:04:51 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread.
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger
60  * spooling areas then the memory buffer is able to provide by e.g.
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/file.h>
88 
89 #include <machine/limits.h>
90 
91 #include <vm/vm.h>
92 #include <vm/vm_object.h>
93 #include <vm/vm_page.h>
94 #include <vm/vm_pager.h>
95 #include <vm/vnode_pager.h>
96 
97 #include <sys/file2.h>
98 #include <sys/thread2.h>
99 
100 static int journal_attach(struct mount *mp);
101 static void journal_detach(struct mount *mp);
102 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
103 			    const struct mountctl_install_journal *info);
104 static int journal_remove_vfs_journal(struct mount *mp,
105 			    const struct mountctl_remove_journal *info);
106 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
107 static int journal_status_vfs_journal(struct mount *mp,
108 		       const struct mountctl_status_journal *info,
109 		       struct mountctl_journal_ret_status *rstat,
110 		       int buflen, int *res);
111 static void journal_thread(void *info);
112 
113 static void *journal_reserve(struct journal *jo,
114 			    struct journal_rawrecbeg **rawpp,
115 			    int16_t streamid, int bytes);
116 static void *journal_extend(struct journal *jo,
117 			    struct journal_rawrecbeg **rawpp,
118 			    int truncbytes, int bytes, int *newstreamrecp);
119 static void journal_abort(struct journal *jo,
120 			    struct journal_rawrecbeg **rawpp);
121 static void journal_commit(struct journal *jo,
122 			    struct journal_rawrecbeg **rawpp,
123 			    int bytes, int closeout);
124 
125 static void jrecord_init(struct journal *jo,
126 			    struct jrecord *jrec, int16_t streamid);
127 static struct journal_subrecord *jrecord_push(
128 			    struct jrecord *jrec, int16_t rectype);
129 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
130 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
131 			    int16_t rectype, int bytes);
132 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
133 static void jrecord_done(struct jrecord *jrec, int abortit);
134 
135 static void jrecord_write_path(struct jrecord *jrec,
136 			    int16_t rectype, struct namecache *ncp);
137 static void jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat);
138 
139 
140 static int journal_setattr(struct vop_setattr_args *ap);
141 static int journal_write(struct vop_write_args *ap);
142 static int journal_fsync(struct vop_fsync_args *ap);
143 static int journal_putpages(struct vop_putpages_args *ap);
144 static int journal_setacl(struct vop_setacl_args *ap);
145 static int journal_setextattr(struct vop_setextattr_args *ap);
146 static int journal_ncreate(struct vop_ncreate_args *ap);
147 static int journal_nmknod(struct vop_nmknod_args *ap);
148 static int journal_nlink(struct vop_nlink_args *ap);
149 static int journal_nsymlink(struct vop_nsymlink_args *ap);
150 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
151 static int journal_nremove(struct vop_nremove_args *ap);
152 static int journal_nmkdir(struct vop_nmkdir_args *ap);
153 static int journal_nrmdir(struct vop_nrmdir_args *ap);
154 static int journal_nrename(struct vop_nrename_args *ap);
155 
156 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
157     { &vop_default_desc,		vop_journal_operate_ap },
158     { &vop_mountctl_desc,		(void *)journal_mountctl },
159     { &vop_setattr_desc,		(void *)journal_setattr },
160     { &vop_write_desc,			(void *)journal_write },
161     { &vop_fsync_desc,			(void *)journal_fsync },
162     { &vop_putpages_desc,		(void *)journal_putpages },
163     { &vop_setacl_desc,			(void *)journal_setacl },
164     { &vop_setextattr_desc,		(void *)journal_setextattr },
165     { &vop_ncreate_desc,		(void *)journal_ncreate },
166     { &vop_nmknod_desc,			(void *)journal_nmknod },
167     { &vop_nlink_desc,			(void *)journal_nlink },
168     { &vop_nsymlink_desc,		(void *)journal_nsymlink },
169     { &vop_nwhiteout_desc,		(void *)journal_nwhiteout },
170     { &vop_nremove_desc,		(void *)journal_nremove },
171     { &vop_nmkdir_desc,			(void *)journal_nmkdir },
172     { &vop_nrmdir_desc,			(void *)journal_nrmdir },
173     { &vop_nrename_desc,		(void *)journal_nrename },
174     { NULL, NULL }
175 };
176 
177 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
178 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
179 
180 int
181 journal_mountctl(struct vop_mountctl_args *ap)
182 {
183     struct mount *mp;
184     int error = 0;
185 
186     mp = ap->a_head.a_ops->vv_mount;
187     KKASSERT(mp);
188 
189     if (mp->mnt_vn_journal_ops == NULL) {
190 	switch(ap->a_op) {
191 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
192 	    error = journal_attach(mp);
193 	    if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
194 		error = EINVAL;
195 	    if (error == 0 && ap->a_fp == NULL)
196 		error = EBADF;
197 	    if (error == 0)
198 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
199 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
200 		journal_detach(mp);
201 	    break;
202 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
203 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
204 	case MOUNTCTL_STATUS_VFS_JOURNAL:
205 	    error = ENOENT;
206 	    break;
207 	default:
208 	    error = EOPNOTSUPP;
209 	    break;
210 	}
211     } else {
212 	switch(ap->a_op) {
213 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
214 	    if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
215 		error = EINVAL;
216 	    if (error == 0 && ap->a_fp == NULL)
217 		error = EBADF;
218 	    if (error == 0)
219 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
220 	    break;
221 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
222 	    if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
223 		error = EINVAL;
224 	    if (error == 0)
225 		error = journal_remove_vfs_journal(mp, ap->a_ctl);
226 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
227 		journal_detach(mp);
228 	    break;
229 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
230 	    if (ap->a_ctllen != 0)
231 		error = EINVAL;
232 	    error = journal_resync_vfs_journal(mp, ap->a_ctl);
233 	    break;
234 	case MOUNTCTL_STATUS_VFS_JOURNAL:
235 	    if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
236 		error = EINVAL;
237 	    if (error == 0) {
238 		error = journal_status_vfs_journal(mp, ap->a_ctl,
239 					ap->a_buf, ap->a_buflen, ap->a_res);
240 	    }
241 	    break;
242 	default:
243 	    error = EOPNOTSUPP;
244 	    break;
245 	}
246     }
247     return (error);
248 }
249 
250 /*
251  * High level mount point setup.  When a
252  */
253 static int
254 journal_attach(struct mount *mp)
255 {
256     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, journal_vnodeop_entries);
257     return(0);
258 }
259 
260 static void
261 journal_detach(struct mount *mp)
262 {
263     if (mp->mnt_vn_journal_ops)
264 	vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
265 }
266 
267 /*
268  * Install a journal on a mount point.  Each journal has an associated worker
269  * thread which is responsible for buffering and spooling the data to the
270  * target.  A mount point may have multiple journals attached to it.  An
271  * initial start record is generated when the journal is associated.
272  */
273 static int
274 journal_install_vfs_journal(struct mount *mp, struct file *fp,
275 			    const struct mountctl_install_journal *info)
276 {
277     struct journal *jo;
278     struct jrecord jrec;
279     int error = 0;
280     int size;
281 
282     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
283     bcopy(info->id, jo->id, sizeof(jo->id));
284     jo->flags = info->flags & ~(MC_JOURNAL_ACTIVE | MC_JOURNAL_STOP_REQ);
285 
286     /*
287      * Memory FIFO size, round to nearest power of 2
288      */
289     if (info->membufsize) {
290 	if (info->membufsize < 65536)
291 	    size = 65536;
292 	else if (info->membufsize > 128 * 1024 * 1024)
293 	    size = 128 * 1024 * 1024;
294 	else
295 	    size = (int)info->membufsize;
296     } else {
297 	size = 1024 * 1024;
298     }
299     jo->fifo.size = 1;
300     while (jo->fifo.size < size)
301 	jo->fifo.size <<= 1;
302 
303     /*
304      * Other parameters.  If not specified the starting transaction id
305      * will be the current date.
306      */
307     if (info->transid) {
308 	jo->transid = info->transid;
309     } else {
310 	struct timespec ts;
311 	getnanotime(&ts);
312 	jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
313     }
314 
315     jo->fp = fp;
316 
317     /*
318      * Allocate the memory FIFO
319      */
320     jo->fifo.mask = jo->fifo.size - 1;
321     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
322     if (jo->fifo.membase == NULL)
323 	error = ENOMEM;
324 
325     /*
326      * Create the worker thread and generate the association record.
327      */
328     if (error) {
329 	free(jo, M_JOURNAL);
330     } else {
331 	fhold(fp);
332 	jo->flags |= MC_JOURNAL_ACTIVE;
333 	lwkt_create(journal_thread, jo, NULL, &jo->thread,
334 			TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
335 	lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
336 	lwkt_schedule(&jo->thread);
337 
338 	jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
339 	jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
340 	jrecord_done(&jrec, 0);
341 	TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
342     }
343     return(error);
344 }
345 
346 /*
347  * Disassociate a journal from a mount point and terminate its worker thread.
348  * A final termination record is written out before the file pointer is
349  * dropped.
350  */
351 static int
352 journal_remove_vfs_journal(struct mount *mp,
353 			   const struct mountctl_remove_journal *info)
354 {
355     struct journal *jo;
356     struct jrecord jrec;
357     int error;
358 
359     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
360 	if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
361 	    break;
362     }
363     if (jo) {
364 	error = 0;
365 	TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
366 
367 	jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
368 	jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
369 	jrecord_done(&jrec, 0);
370 
371 	jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
372 	wakeup(&jo->fifo);
373 	while (jo->flags & MC_JOURNAL_ACTIVE) {
374 	    tsleep(jo, 0, "jwait", 0);
375 	}
376 	lwkt_free_thread(&jo->thread); /* XXX SMP */
377 	if (jo->fp)
378 	    fdrop(jo->fp, curthread);
379 	if (jo->fifo.membase)
380 	    free(jo->fifo.membase, M_JFIFO);
381 	free(jo, M_JOURNAL);
382     } else {
383 	error = EINVAL;
384     }
385     return (error);
386 }
387 
388 static int
389 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
390 {
391     return(EINVAL);
392 }
393 
394 static int
395 journal_status_vfs_journal(struct mount *mp,
396 		       const struct mountctl_status_journal *info,
397 		       struct mountctl_journal_ret_status *rstat,
398 		       int buflen, int *res)
399 {
400     struct journal *jo;
401     int error = 0;
402     int index;
403 
404     index = 0;
405     *res = 0;
406     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
407 	if (info->index == MC_JOURNAL_INDEX_ID) {
408 	    if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
409 		continue;
410 	} else if (info->index >= 0) {
411 	    if (info->index < index)
412 		continue;
413 	} else if (info->index != MC_JOURNAL_INDEX_ALL) {
414 	    continue;
415 	}
416 	if (buflen < sizeof(*rstat)) {
417 	    if (*res)
418 		rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
419 	    else
420 		error = EINVAL;
421 	    break;
422 	}
423 	bzero(rstat, sizeof(*rstat));
424 	rstat->recsize = sizeof(*rstat);
425 	bcopy(jo->id, rstat->id, sizeof(jo->id));
426 	rstat->index = index;
427 	rstat->membufsize = jo->fifo.size;
428 	rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
429 	rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
430 	rstat->bytessent = jo->total_acked;
431 	++rstat;
432 	++index;
433 	*res += sizeof(*rstat);
434 	buflen -= sizeof(*rstat);
435     }
436     return(error);
437 }
438 /*
439  * The per-journal worker thread is responsible for writing out the
440  * journal's FIFO to the target stream.
441  */
442 static void
443 journal_thread(void *info)
444 {
445     struct journal *jo = info;
446     struct journal_rawrecbeg *rawp;
447     int bytes;
448     int error;
449     int avail;
450     int res;
451 
452     for (;;) {
453 	/*
454 	 * Calculate the number of bytes available to write.  This buffer
455 	 * area may contain reserved records so we can't just write it out
456 	 * without further checks.
457 	 */
458 	bytes = jo->fifo.windex - jo->fifo.rindex;
459 
460 	/*
461 	 * sleep if no bytes are available or if an incomplete record is
462 	 * encountered (it needs to be filled in before we can write it
463 	 * out), and skip any pad records that we encounter.
464 	 */
465 	if (bytes == 0) {
466 	    if (jo->flags & MC_JOURNAL_STOP_REQ)
467 		break;
468 	    tsleep(&jo->fifo, 0, "jfifo", hz);
469 	    continue;
470 	}
471 	rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
472 	if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
473 	    tsleep(&jo->fifo, 0, "jpad", hz);
474 	    continue;
475 	}
476 	if (rawp->streamid == JREC_STREAMID_PAD) {
477 	    jo->fifo.rindex += (rawp->recsize + 15) & ~15;
478 	    KKASSERT(jo->fifo.windex - jo->fifo.rindex > 0);
479 	    continue;
480 	}
481 
482 	/*
483 	 * Figure out how much we can write out, beware the buffer wrap
484 	 * case.
485 	 */
486 	res = 0;
487 	avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
488 	while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
489 	    res += (rawp->recsize + 15) & ~15;
490 	    if (res >= avail) {
491 		KKASSERT(res == avail);
492 		break;
493 	    }
494 	}
495 
496 	/*
497 	 * Issue the write and deal with any errors or other conditions.
498 	 * For now assume blocking I/O.  Since we are record-aware the
499 	 * code cannot yet handle partial writes.
500 	 *
501 	 * XXX EWOULDBLOCK/NBIO
502 	 * XXX notification on failure
503 	 * XXX two-way acknowledgement stream in the return direction / xindex
504 	 */
505 	printf("write @%d,%d\n", jo->fifo.rindex & jo->fifo.mask, bytes);
506 	bytes = res;
507 	error = fp_write(jo->fp,
508 			jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
509 			bytes, &res);
510 	if (error) {
511 	    printf("journal_thread(%s) write, error %d\n", jo->id, error);
512 	    /* XXX */
513 	} else {
514 	    KKASSERT(res == bytes);
515 	    printf("journal_thread(%s) write %d\n", jo->id, res);
516 	}
517 
518 	/*
519 	 * Advance rindex.  XXX for now also advance xindex, which will
520 	 * eventually be advanced when the target acknowledges the sequence
521 	 * space.
522 	 */
523 	jo->fifo.rindex += bytes;
524 	jo->fifo.xindex += bytes;
525 	jo->total_acked += bytes;
526 	if (jo->flags & MC_JOURNAL_WWAIT) {
527 	    jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
528 	    wakeup(&jo->fifo.windex);
529 	}
530     }
531     jo->flags &= ~MC_JOURNAL_ACTIVE;
532     wakeup(jo);
533     wakeup(&jo->fifo.windex);
534 }
535 
536 static __inline
537 void
538 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
539 {
540     struct journal_rawrecend *rendp;
541 
542     KKASSERT((recsize & 15) == 0 && recsize >= 16);
543 
544     rawp->begmagic = JREC_BEGMAGIC;
545     rawp->streamid = JREC_STREAMID_PAD;
546     rawp->recsize = recsize;	/* must be 16-byte aligned */
547     rawp->seqno = 0;
548     /*
549      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
550      * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
551      * hopefully cause the compiler to not make any assumptions.
552      */
553     cpu_mb1();
554     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
555     rendp->endmagic = JREC_ENDMAGIC;
556     rendp->check = 0;
557     rendp->recsize = rawp->recsize;
558 }
559 
560 /*
561  * Wake up the worker thread if the FIFO is more then half full or if
562  * someone is waiting for space to be freed up.  Otherwise let the
563  * heartbeat deal with it.  Being able to avoid waking up the worker
564  * is the key to the journal's cpu efficiency.
565  */
566 static __inline
567 void
568 journal_commit_wakeup(struct journal *jo)
569 {
570     int avail;
571 
572     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
573     KKASSERT(avail >= 0);
574     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
575 	wakeup(&jo->fifo);
576 }
577 
578 /*
579  * Create a new BEGIN stream record with the specified streamid and the
580  * specified amount of payload space.  *rawpp will be set to point to the
581  * base of the new stream record and a pointer to the base of the payload
582  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
583  * making this call.
584  *
585  * A stream can be extended, aborted, or committed by other API calls
586  * below.  This may result in a sequence of potentially disconnected
587  * stream records to be output to the journaling target.  The first record
588  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
589  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
590  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
591  * up being the same as the first, in which case the bits are all set in
592  * the first record.
593  *
594  * The stream record is created in an incomplete state by setting the begin
595  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
596  * flushing the fifo past our record until we have finished populating it.
597  * Other threads can reserve and operate on their own space without stalling
598  * but the stream output will stall until we have completed operations.  The
599  * memory FIFO is intended to be large enough to absorb such situations
600  * without stalling out other threads.
601  */
602 static
603 void *
604 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
605 		int16_t streamid, int bytes)
606 {
607     struct journal_rawrecbeg *rawp;
608     int avail;
609     int availtoend;
610     int req;
611 
612     /*
613      * Add header and trailer overheads to the passed payload.  Note that
614      * the passed payload size need not be aligned in any way.
615      */
616     bytes += sizeof(struct journal_rawrecbeg);
617     bytes += sizeof(struct journal_rawrecend);
618 
619     for (;;) {
620 	/*
621 	 * First, check boundary conditions.  If the request would wrap around
622 	 * we have to skip past the ending block and return to the beginning
623 	 * of the FIFO's buffer.  Calculate 'req' which is the actual number
624 	 * of bytes being reserved, including wrap-around dead space.
625 	 *
626 	 * Note that availtoend is not truncated to avail and so cannot be
627 	 * used to determine whether the reservation is possible by itself.
628 	 * Also, since all fifo ops are 16-byte aligned, we can check
629 	 * the size before calculating the aligned size.
630 	 */
631 	availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
632 	if (bytes > availtoend)
633 	    req = bytes + availtoend;	/* add pad to end */
634 	else
635 	    req = bytes;
636 
637 	/*
638 	 * Next calculate the total available space and see if it is
639 	 * sufficient.  We cannot overwrite previously buffered data
640 	 * past xindex because otherwise we would not be able to restart
641 	 * a broken link at the target's last point of commit.
642 	 */
643 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
644 	KKASSERT(avail >= 0 && (avail & 15) == 0);
645 
646 	if (avail < req) {
647 	    /* XXX MC_JOURNAL_STOP_IMM */
648 	    jo->flags |= MC_JOURNAL_WWAIT;
649 	    tsleep(&jo->fifo.windex, 0, "jwrite", 0);
650 	    continue;
651 	}
652 
653 	/*
654 	 * Create a pad record for any dead space and create an incomplete
655 	 * record for the live space, then return a pointer to the
656 	 * contiguous buffer space that was requested.
657 	 *
658 	 * NOTE: The worker thread will not flush past an incomplete
659 	 * record, so the reserved space can be filled in at-will.  The
660 	 * journaling code must also be aware the reserved sections occuring
661 	 * after this one will also not be written out even if completed
662 	 * until this one is completed.
663 	 */
664 	rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
665 	if (req != bytes) {
666 	    journal_build_pad(rawp, req - bytes);
667 	    rawp = (void *)jo->fifo.membase;
668 	}
669 	rawp->begmagic = JREC_INCOMPLETEMAGIC;	/* updated by abort/commit */
670 	rawp->recsize = bytes;			/* (unaligned size) */
671 	rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
672 	rawp->seqno = 0;			/* set by caller */
673 
674 	/*
675 	 * Issue a memory barrier to guarentee that the record data has been
676 	 * properly initialized before we advance the write index and return
677 	 * a pointer to the reserved record.  Otherwise the worker thread
678 	 * could accidently run past us.
679 	 *
680 	 * Note that stream records are always 16-byte aligned.
681 	 */
682 	cpu_mb1();
683 	jo->fifo.windex += (req + 15) & ~15;
684 	*rawpp = rawp;
685 	return(rawp + 1);
686     }
687     /* not reached */
688     *rawpp = NULL;
689     return(NULL);
690 }
691 
692 /*
693  * Extend a previous reservation by the specified number of payload bytes.
694  * If it is not possible to extend the existing reservation due to either
695  * another thread having reserved space after us or due to a boundary
696  * condition, the current reservation will be committed and possibly
697  * truncated and a new reservation with the specified payload size will
698  * be created. *rawpp is set to the new reservation in this case but the
699  * caller cannot depend on a comparison with the old rawp to determine if
700  * this case occurs because we could end up using the same memory FIFO
701  * offset for the new stream record.
702  *
703  * In either case this function will return a pointer to the base of the
704  * extended payload space.
705  *
706  * If a new stream block is created the caller needs to recalculate payload
707  * byte counts, if the same stream block is used the caller needs to extend
708  * its current notion of the payload byte count.
709  */
710 static void *
711 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp,
712 		int truncbytes, int bytes, int *newstreamrecp)
713 {
714     struct journal_rawrecbeg *rawp;
715     int16_t streamid;
716     int availtoend;
717     int avail;
718     int osize;
719     int nsize;
720     int wbase;
721     void *rptr;
722 
723     *newstreamrecp = 0;
724     rawp = *rawpp;
725     osize = (rawp->recsize + 15) & ~15;
726     nsize = (rawp->recsize + bytes + 15) & ~15;
727     wbase = (char *)rawp - jo->fifo.membase;
728 
729     /*
730      * If the aligned record size does not change we can trivially extend
731      * the record.
732      */
733     if (nsize == osize) {
734 	rawp->recsize += bytes;
735 	return((char *)rawp + rawp->recsize - bytes);
736     }
737 
738     /*
739      * If the fifo's write index hasn't been modified since we made the
740      * reservation and we do not hit any boundary conditions, we can
741      * trivially extend the record.
742      */
743     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
744 	availtoend = jo->fifo.size - wbase;
745 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
746 	KKASSERT((availtoend & 15) == 0);
747 	KKASSERT((avail & 15) == 0);
748 	if (nsize <= avail && nsize <= availtoend) {
749 	    jo->fifo.windex += nsize - osize;
750 	    rawp->recsize += bytes;
751 	    return((char *)rawp + rawp->recsize - bytes);
752 	}
753     }
754 
755     /*
756      * It was not possible to extend the buffer.  Commit the current
757      * buffer and create a new one.  We manually clear the BEGIN mark that
758      * journal_reserve() creates (because this is a continuing record, not
759      * the start of a new stream).
760      */
761     streamid = rawp->streamid & JREC_STREAMID_MASK;
762     journal_commit(jo, rawpp, truncbytes, 0);
763     rptr = journal_reserve(jo, rawpp, streamid, bytes);
764     rawp = *rawpp;
765     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
766     *newstreamrecp = 1;
767     return(rptr);
768 }
769 
770 /*
771  * Abort a journal record.  If the transaction record represents a stream
772  * BEGIN and we can reverse the fifo's write index we can simply reverse
773  * index the entire record, as if it were never reserved in the first place.
774  *
775  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
776  * with the payload truncated to 0 bytes.
777  */
778 static void
779 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
780 {
781     struct journal_rawrecbeg *rawp;
782     int osize;
783 
784     rawp = *rawpp;
785     osize = (rawp->recsize + 15) & ~15;
786 
787     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
788 	(jo->fifo.windex & jo->fifo.mask) ==
789 	 (char *)rawp - jo->fifo.membase + osize)
790     {
791 	jo->fifo.windex -= osize;
792 	*rawpp = NULL;
793     } else {
794 	rawp->streamid |= JREC_STREAMCTL_ABORTED;
795 	journal_commit(jo, rawpp, 0, 1);
796     }
797 }
798 
799 /*
800  * Commit a journal record and potentially truncate it to the specified
801  * number of payload bytes.  If you do not want to truncate the record,
802  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
803  * field includes header and trailer and will not be correct.  Note that
804  * passing 0 will truncate the entire data payload of the record.
805  *
806  * The logical stream is terminated by this function.
807  *
808  * If truncation occurs, and it is not possible to physically optimize the
809  * memory FIFO due to other threads having reserved space after ours,
810  * the remaining reserved space will be covered by a pad record.
811  */
812 static void
813 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
814 		int bytes, int closeout)
815 {
816     struct journal_rawrecbeg *rawp;
817     struct journal_rawrecend *rendp;
818     int osize;
819     int nsize;
820 
821     rawp = *rawpp;
822     *rawpp = NULL;
823 
824     KKASSERT((char *)rawp >= jo->fifo.membase &&
825 	     (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
826     KKASSERT(((intptr_t)rawp & 15) == 0);
827 
828     /*
829      * Truncate the record if requested.  If the FIFO write index as still
830      * at the end of our record we can optimally backindex it.  Otherwise
831      * we have to insert a pad record.
832      *
833      * We calculate osize which is the 16-byte-aligned original recsize.
834      * We calculate nsize which is the 16-byte-aligned new recsize.
835      *
836      * Due to alignment issues or in case the passed truncation bytes is
837      * the same as the original payload, windex will be equal to nindex.
838      */
839     if (bytes >= 0) {
840 	KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
841 	osize = (rawp->recsize + 15) & ~15;
842 	rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
843 			sizeof(struct journal_rawrecend);
844 	nsize = (rawp->recsize + 15) & ~15;
845 	if (osize == nsize) {
846 	    /* do nothing */
847 	} else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
848 	    /* we are able to backindex the fifo */
849 	    jo->fifo.windex -= osize - nsize;
850 	} else {
851 	    /* we cannot backindex the fifo, emplace a pad in the dead space */
852 	    journal_build_pad((void *)((char *)rawp + osize), osize - nsize);
853 	}
854     }
855 
856     /*
857      * Fill in the trailer.  Note that unlike pad records, the trailer will
858      * never overlap the header.
859      */
860     rendp = (void *)((char *)rawp +
861 	    ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
862     rendp->endmagic = JREC_ENDMAGIC;
863     rendp->recsize = rawp->recsize;
864     rendp->check = 0;		/* XXX check word, disabled for now */
865 
866     /*
867      * Fill in begmagic last.  This will allow the worker thread to proceed.
868      * Use a memory barrier to guarentee write ordering.  Mark the stream
869      * as terminated if closeout is set.  This is the typical case.
870      */
871     if (closeout)
872 	rawp->streamid |= JREC_STREAMCTL_END;
873     cpu_mb1();			/* memory barrier */
874     rawp->begmagic = JREC_BEGMAGIC;
875 
876     journal_commit_wakeup(jo);
877 }
878 
879 /************************************************************************
880  *			TRANSACTION SUPPORT ROUTINES			*
881  ************************************************************************
882  *
883  * JRECORD_*() - routines to create subrecord transactions and embed them
884  *		 in the logical streams managed by the journal_*() routines.
885  */
886 
887 static int16_t sid = JREC_STREAMID_JMIN;
888 
889 /*
890  * Initialize the passed jrecord structure and start a new stream transaction
891  * by reserving an initial build space in the journal's memory FIFO.
892  */
893 static void
894 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
895 {
896     bzero(jrec, sizeof(*jrec));
897     jrec->jo = jo;
898     if (streamid < 0) {
899 	streamid = sid++;	/* XXX need to track stream ids! */
900 	if (sid == JREC_STREAMID_JMAX)
901 	    sid = JREC_STREAMID_JMIN;
902     }
903     jrec->streamid = streamid;
904     jrec->stream_residual = JREC_DEFAULTSIZE;
905     jrec->stream_reserved = jrec->stream_residual;
906     jrec->stream_ptr =
907 	journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
908 }
909 
910 /*
911  * Push a recursive record type.  All pushes should have matching pops.
912  * The old parent is returned and the newly pushed record becomes the
913  * new parent.  Note that the old parent's pointer may already be invalid
914  * or may become invalid if jrecord_write() had to build a new stream
915  * record, so the caller should not mess with the returned pointer in
916  * any way other then to save it.
917  */
918 static
919 struct journal_subrecord *
920 jrecord_push(struct jrecord *jrec, int16_t rectype)
921 {
922     struct journal_subrecord *save;
923 
924     save = jrec->parent;
925     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
926     jrec->last = NULL;
927     KKASSERT(jrec->parent != NULL);
928     ++jrec->pushcount;
929     ++jrec->pushptrgood;	/* cleared on flush */
930     return(save);
931 }
932 
933 /*
934  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
935  * on the last record written within the subtransaction.  If the last
936  * record written is not accessible or if the subtransaction is empty,
937  * we must write out a pad record with JMASK_LAST set before popping.
938  *
939  * When popping a subtransaction the parent record's recsize field
940  * will be properly set.  If the parent pointer is no longer valid
941  * (which can occur if the data has already been flushed out to the
942  * stream), the protocol spec allows us to leave it 0.
943  *
944  * The saved parent pointer which we restore may or may not be valid,
945  * and if not valid may or may not be NULL, depending on the value
946  * of pushptrgood.
947  */
948 static void
949 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
950 {
951     struct journal_subrecord *last;
952 
953     KKASSERT(jrec->pushcount > 0);
954     KKASSERT(jrec->residual == 0);
955 
956     /*
957      * Set JMASK_LAST on the last record we wrote at the current
958      * level.  If last is NULL we either no longer have access to the
959      * record or the subtransaction was empty and we must write out a pad
960      * record.
961      */
962     if ((last = jrec->last) == NULL) {
963 	jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
964 	last = jrec->last;	/* reload after possible flush */
965     } else {
966 	last->rectype |= JMASK_LAST;
967     }
968 
969     /*
970      * pushptrgood tells us how many levels of parent record pointers
971      * are valid.  The jrec only stores the current parent record pointer
972      * (and it is only valid if pushptrgood != 0).  The higher level parent
973      * record pointers are saved by the routines calling jrecord_push() and
974      * jrecord_pop().  These pointers may become stale and we determine
975      * that fact by tracking the count of valid parent pointers with
976      * pushptrgood.  Pointers become invalid when their related stream
977      * record gets pushed out.
978      *
979      * [parentA]
980      *	  [node X]
981      *    [parentB]
982      *	     [node Y]
983      *	     [node Z]
984      *    (pop B)	see NOTE B
985      * (pop A)		see NOTE A
986      *
987      * NOTE B:	This pop sets LAST in node Z if the node is still accessible,
988      *		else a PAD record is appended and LAST is set in that.
989      *
990      *		This pop sets the record size in parentB if parentB is still
991      *		accessible, else the record size is left 0 (the scanner must
992      *		deal with that).
993      *
994      *		This pop sets the new 'last' record to parentB, the pointer
995      *		to which may or may not still be accessible.
996      *
997      * NOTE A:	This pop sets LAST in parentB if the node is still accessible,
998      *		else a PAD record is appended and LAST is set in that.
999      *
1000      *		This pop sets the record size in parentA if parentA is still
1001      *		accessible, else the record size is left 0 (the scanner must
1002      *		deal with that).
1003      *
1004      *		This pop sets the new 'last' record to parentA, the pointer
1005      *		to which may or may not still be accessible.
1006      *
1007      * Also note that the last record in the stream transaction, which in
1008      * the above example is parentA, does not currently have the LAST bit
1009      * set.
1010      *
1011      * The current parent becomes the last record relative to the
1012      * saved parent passed into us.  It's validity is based on
1013      * whether pushptrgood is non-zero prior to decrementing.  The saved
1014      * parent becomes the new parent, and its validity is based on whether
1015      * pushptrgood is non-zero after decrementing.
1016      *
1017      * The old jrec->parent may be NULL if it is no longer accessible.
1018      * If pushptrgood is non-zero, however, it is guarenteed to not
1019      * be NULL (since no flush occured).
1020      */
1021     jrec->last = jrec->parent;
1022     --jrec->pushcount;
1023     if (jrec->pushptrgood) {
1024 	KKASSERT(jrec->last != NULL && last != NULL);
1025 	if (--jrec->pushptrgood == 0) {
1026 	    jrec->parent = NULL;	/* 'save' contains garbage or NULL */
1027 	} else {
1028 	    KKASSERT(save != NULL);
1029 	    jrec->parent = save;	/* 'save' must not be NULL */
1030 	}
1031 
1032 	/*
1033 	 * Set the record size in the old parent.  'last' still points to
1034 	 * the original last record in the subtransaction being popped,
1035 	 * jrec->last points to the old parent (which became the last
1036 	 * record relative to the new parent being popped into).
1037 	 */
1038 	jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1039     } else {
1040 	jrec->parent = NULL;
1041 	KKASSERT(jrec->last == NULL);
1042     }
1043 }
1044 
1045 /*
1046  * Write a leaf record out and return a pointer to its base.  The leaf
1047  * record may contain potentially megabytes of data which is supplied
1048  * in jrecord_data() calls.  The exact amount must be specified in this
1049  * call.
1050  */
1051 static
1052 struct journal_subrecord *
1053 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1054 {
1055     struct journal_subrecord *last;
1056     int pusheditout;
1057 
1058     /*
1059      * Try to catch some obvious errors.  Nesting records must specify a
1060      * size of 0, and there should be no left-overs from previous operations
1061      * (such as incomplete data writeouts).
1062      */
1063     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1064     KKASSERT(jrec->residual == 0);
1065 
1066     /*
1067      * Check to see if the current stream record has enough room for
1068      * the new subrecord header.  If it doesn't we extend the current
1069      * stream record.
1070      *
1071      * This may have the side effect of pushing out the current stream record
1072      * and creating a new one.  We must adjust our stream tracking fields
1073      * accordingly.
1074      */
1075     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1076 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1077 				jrec->stream_reserved - jrec->stream_residual,
1078 				JREC_DEFAULTSIZE, &pusheditout);
1079 	if (pusheditout) {
1080 	    jrec->stream_reserved = JREC_DEFAULTSIZE;
1081 	    jrec->stream_residual = JREC_DEFAULTSIZE;
1082 	    jrec->parent = NULL;	/* no longer accessible */
1083 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1084 	} else {
1085 	    jrec->stream_reserved += JREC_DEFAULTSIZE;
1086 	    jrec->stream_residual += JREC_DEFAULTSIZE;
1087 	}
1088     }
1089     last = (void *)jrec->stream_ptr;
1090     last->rectype = rectype;
1091     last->reserved = 0;
1092     last->recsize = sizeof(struct journal_subrecord) + bytes;
1093     jrec->last = last;
1094     jrec->residual = bytes;		/* remaining data to be posted */
1095     jrec->residual_align = -bytes & 7;	/* post-data alignment required */
1096     return(last);
1097 }
1098 
1099 /*
1100  * Write out the data associated with a leaf record.  Any number of calls
1101  * to this routine may be made as long as the byte count adds up to the
1102  * amount originally specified in jrecord_write().
1103  *
1104  * The act of writing out the leaf data may result in numerous stream records
1105  * being pushed out.   Callers should be aware that even the associated
1106  * subrecord header may become inaccessible due to stream record pushouts.
1107  */
1108 static void
1109 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1110 {
1111     int pusheditout;
1112     int extsize;
1113 
1114     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1115 
1116     /*
1117      * Push out stream records as long as there is insufficient room to hold
1118      * the remaining data.
1119      */
1120     while (jrec->stream_residual < bytes) {
1121 	/*
1122 	 * Fill in any remaining space in the current stream record.
1123 	 */
1124 	bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1125 	buf = (const char *)buf + jrec->stream_residual;
1126 	bytes -= jrec->stream_residual;
1127 	/*jrec->stream_ptr += jrec->stream_residual;*/
1128 	jrec->stream_residual = 0;
1129 	jrec->residual -= jrec->stream_residual;
1130 
1131 	/*
1132 	 * Try to extend the current stream record, but no more then 1/4
1133 	 * the size of the FIFO.
1134 	 */
1135 	extsize = jrec->jo->fifo.size >> 2;
1136 	if (extsize > bytes)
1137 	    extsize = (bytes + 15) & ~15;
1138 
1139 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1140 				jrec->stream_reserved - jrec->stream_residual,
1141 				extsize, &pusheditout);
1142 	if (pusheditout) {
1143 	    jrec->stream_reserved = extsize;
1144 	    jrec->stream_residual = extsize;
1145 	    jrec->parent = NULL;	/* no longer accessible */
1146 	    jrec->last = NULL;		/* no longer accessible */
1147 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1148 	} else {
1149 	    jrec->stream_reserved += extsize;
1150 	    jrec->stream_residual += extsize;
1151 	}
1152     }
1153 
1154     /*
1155      * Push out any remaining bytes into the current stream record.
1156      */
1157     if (bytes) {
1158 	bcopy(buf, jrec->stream_ptr, bytes);
1159 	jrec->stream_ptr += bytes;
1160 	jrec->stream_residual -= bytes;
1161 	jrec->residual -= bytes;
1162     }
1163 
1164     /*
1165      * Handle data alignment requirements for the subrecord.  Because the
1166      * stream record's data space is more strictly aligned, it must already
1167      * have sufficient space to hold any subrecord alignment slop.
1168      */
1169     if (jrec->residual == 0 && jrec->residual_align) {
1170 	KKASSERT(jrec->residual_align <= jrec->stream_residual);
1171 	bzero(jrec->stream_ptr, jrec->residual_align);
1172 	jrec->stream_ptr += jrec->residual_align;
1173 	jrec->stream_residual -= jrec->residual_align;
1174 	jrec->residual_align = 0;
1175     }
1176 }
1177 
1178 /*
1179  * We are finished with a transaction.  If abortit is not set then we must
1180  * be at the top level with no residual subrecord data left to output.
1181  * If abortit is set then we can be in any state.
1182  *
1183  * The stream record will be committed or aborted as specified and jrecord
1184  * resources will be cleaned up.
1185  */
1186 static void
1187 jrecord_done(struct jrecord *jrec, int abortit)
1188 {
1189     KKASSERT(jrec->rawp != NULL);
1190 
1191     if (abortit) {
1192 	journal_abort(jrec->jo, &jrec->rawp);
1193     } else {
1194 	KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1195 	journal_commit(jrec->jo, &jrec->rawp,
1196 			jrec->stream_reserved - jrec->stream_residual, 1);
1197     }
1198 
1199     /*
1200      * jrec should not be used beyond this point without another init,
1201      * but clean up some fields to ensure that we panic if it is.
1202      *
1203      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1204      */
1205     jrec->jo = NULL;
1206     jrec->stream_ptr = NULL;
1207 }
1208 
1209 /************************************************************************
1210  *			LEAF RECORD SUPPORT ROUTINES			*
1211  ************************************************************************
1212  *
1213  * These routine create leaf subrecords representing common filesystem
1214  * structures.
1215  */
1216 
1217 static void
1218 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1219 {
1220 }
1221 
1222 static void
1223 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1224 {
1225 }
1226 
1227 /************************************************************************
1228  *			JOURNAL VNOPS					*
1229  ************************************************************************
1230  *
1231  * These are function shims replacing the normal filesystem ops.  We become
1232  * responsible for calling the underlying filesystem ops.  We have the choice
1233  * of executing the underlying op first and then generating the journal entry,
1234  * or starting the journal entry, executing the underlying op, and then
1235  * either completing or aborting it.
1236  *
1237  * The journal is supposed to be a high-level entity, which generally means
1238  * identifying files by name rather then by inode.  Supplying both allows
1239  * the journal to be used both for inode-number-compatible 'mirrors' and
1240  * for simple filesystem replication.
1241  *
1242  * Writes are particularly difficult to deal with because a single write may
1243  * represent a hundred megabyte buffer or more, and both writes and truncations
1244  * require the 'old' data to be written out as well as the new data if the
1245  * log is reversable.  Other issues:
1246  *
1247  * - How to deal with operations on unlinked files (no path available),
1248  *   but which may still be filesystem visible due to hard links.
1249  *
1250  * - How to deal with modifications made via a memory map.
1251  *
1252  * - Future cache coherency support will require cache coherency API calls
1253  *   both prior to and after the call to the underlying VFS.
1254  *
1255  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
1256  * new VFS equivalents (NMKDIR).
1257  */
1258 
1259 static
1260 int
1261 journal_setattr(struct vop_setattr_args *ap)
1262 {
1263     struct mount *mp;
1264     struct journal *jo;
1265     struct jrecord jrec;
1266     void *save;		/* warning, save pointers do not always remain valid */
1267     int error;
1268 
1269     error = vop_journal_operate_ap(&ap->a_head);
1270     mp = ap->a_head.a_ops->vv_mount;
1271     if (error == 0) {
1272 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1273 	    jrecord_init(jo, &jrec, -1);
1274 	    save = jrecord_push(&jrec, JTYPE_SETATTR);
1275 	    jrecord_pop(&jrec, save);
1276 	    jrecord_done(&jrec, 0);
1277 	}
1278     }
1279     return (error);
1280 }
1281 
1282 static
1283 int
1284 journal_write(struct vop_write_args *ap)
1285 {
1286     struct mount *mp;
1287     struct journal *jo;
1288     struct jrecord jrec;
1289     void *save;		/* warning, save pointers do not always remain valid */
1290     int error;
1291 
1292     error = vop_journal_operate_ap(&ap->a_head);
1293     mp = ap->a_head.a_ops->vv_mount;
1294     if (error == 0) {
1295 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1296 	    jrecord_init(jo, &jrec, -1);
1297 	    save = jrecord_push(&jrec, JTYPE_WRITE);
1298 	    jrecord_pop(&jrec, save);
1299 	    jrecord_done(&jrec, 0);
1300 	}
1301     }
1302     return (error);
1303 }
1304 
1305 static
1306 int
1307 journal_fsync(struct vop_fsync_args *ap)
1308 {
1309     struct mount *mp;
1310     struct journal *jo;
1311     int error;
1312 
1313     error = vop_journal_operate_ap(&ap->a_head);
1314     mp = ap->a_head.a_ops->vv_mount;
1315     if (error == 0) {
1316 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1317 	    /* XXX synchronize pending journal records */
1318 	}
1319     }
1320     return (error);
1321 }
1322 
1323 static
1324 int
1325 journal_putpages(struct vop_putpages_args *ap)
1326 {
1327     struct mount *mp;
1328     struct journal *jo;
1329     struct jrecord jrec;
1330     void *save;		/* warning, save pointers do not always remain valid */
1331     int error;
1332 
1333     error = vop_journal_operate_ap(&ap->a_head);
1334     mp = ap->a_head.a_ops->vv_mount;
1335     if (error == 0) {
1336 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1337 	    jrecord_init(jo, &jrec, -1);
1338 	    save = jrecord_push(&jrec, JTYPE_PUTPAGES);
1339 	    jrecord_pop(&jrec, save);
1340 	    jrecord_done(&jrec, 0);
1341 	}
1342     }
1343     return (error);
1344 }
1345 
1346 static
1347 int
1348 journal_setacl(struct vop_setacl_args *ap)
1349 {
1350     struct mount *mp;
1351     struct journal *jo;
1352     struct jrecord jrec;
1353     void *save;		/* warning, save pointers do not always remain valid */
1354     int error;
1355 
1356     error = vop_journal_operate_ap(&ap->a_head);
1357     mp = ap->a_head.a_ops->vv_mount;
1358     if (error == 0) {
1359 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1360 	    jrecord_init(jo, &jrec, -1);
1361 	    save = jrecord_push(&jrec, JTYPE_SETACL);
1362 	    jrecord_pop(&jrec, save);
1363 	    jrecord_done(&jrec, 0);
1364 	}
1365     }
1366     return (error);
1367 }
1368 
1369 static
1370 int
1371 journal_setextattr(struct vop_setextattr_args *ap)
1372 {
1373     struct mount *mp;
1374     struct journal *jo;
1375     struct jrecord jrec;
1376     void *save;		/* warning, save pointers do not always remain valid */
1377     int error;
1378 
1379     error = vop_journal_operate_ap(&ap->a_head);
1380     mp = ap->a_head.a_ops->vv_mount;
1381     if (error == 0) {
1382 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1383 	    jrecord_init(jo, &jrec, -1);
1384 	    save = jrecord_push(&jrec, JTYPE_SETEXTATTR);
1385 	    jrecord_pop(&jrec, save);
1386 	    jrecord_done(&jrec, 0);
1387 	}
1388     }
1389     return (error);
1390 }
1391 
1392 static
1393 int
1394 journal_ncreate(struct vop_ncreate_args *ap)
1395 {
1396     struct mount *mp;
1397     struct journal *jo;
1398     struct jrecord jrec;
1399     void *save;		/* warning, save pointers do not always remain valid */
1400     int error;
1401 
1402     error = vop_journal_operate_ap(&ap->a_head);
1403     mp = ap->a_head.a_ops->vv_mount;
1404     if (error == 0) {
1405 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1406 	    jrecord_init(jo, &jrec, -1);
1407 	    save = jrecord_push(&jrec, JTYPE_CREATE);
1408 	    jrecord_pop(&jrec, save);
1409 	    jrecord_done(&jrec, 0);
1410 	}
1411     }
1412     return (error);
1413 }
1414 
1415 static
1416 int
1417 journal_nmknod(struct vop_nmknod_args *ap)
1418 {
1419     struct mount *mp;
1420     struct journal *jo;
1421     struct jrecord jrec;
1422     void *save;		/* warning, save pointers do not always remain valid */
1423     int error;
1424 
1425     error = vop_journal_operate_ap(&ap->a_head);
1426     mp = ap->a_head.a_ops->vv_mount;
1427     if (error == 0) {
1428 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1429 	    jrecord_init(jo, &jrec, -1);
1430 	    save = jrecord_push(&jrec, JTYPE_MKNOD);
1431 	    jrecord_pop(&jrec, save);
1432 	    jrecord_done(&jrec, 0);
1433 	}
1434     }
1435     return (error);
1436 }
1437 
1438 static
1439 int
1440 journal_nlink(struct vop_nlink_args *ap)
1441 {
1442     struct mount *mp;
1443     struct journal *jo;
1444     struct jrecord jrec;
1445     void *save;		/* warning, save pointers do not always remain valid */
1446     int error;
1447 
1448     error = vop_journal_operate_ap(&ap->a_head);
1449     mp = ap->a_head.a_ops->vv_mount;
1450     if (error == 0) {
1451 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1452 	    jrecord_init(jo, &jrec, -1);
1453 	    save = jrecord_push(&jrec, JTYPE_LINK);
1454 	    jrecord_pop(&jrec, save);
1455 	    jrecord_done(&jrec, 0);
1456 	}
1457     }
1458     return (error);
1459 }
1460 
1461 static
1462 int
1463 journal_nsymlink(struct vop_nsymlink_args *ap)
1464 {
1465     struct mount *mp;
1466     struct journal *jo;
1467     struct jrecord jrec;
1468     void *save;		/* warning, save pointers do not always remain valid */
1469     int error;
1470 
1471     error = vop_journal_operate_ap(&ap->a_head);
1472     mp = ap->a_head.a_ops->vv_mount;
1473     if (error == 0) {
1474 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1475 	    jrecord_init(jo, &jrec, -1);
1476 	    save = jrecord_push(&jrec, JTYPE_SYMLINK);
1477 	    jrecord_pop(&jrec, save);
1478 	    jrecord_done(&jrec, 0);
1479 	}
1480     }
1481     return (error);
1482 }
1483 
1484 static
1485 int
1486 journal_nwhiteout(struct vop_nwhiteout_args *ap)
1487 {
1488     struct mount *mp;
1489     struct journal *jo;
1490     struct jrecord jrec;
1491     void *save;		/* warning, save pointers do not always remain valid */
1492     int error;
1493 
1494     error = vop_journal_operate_ap(&ap->a_head);
1495     mp = ap->a_head.a_ops->vv_mount;
1496     if (error == 0) {
1497 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1498 	    jrecord_init(jo, &jrec, -1);
1499 	    save = jrecord_push(&jrec, JTYPE_WHITEOUT);
1500 	    jrecord_pop(&jrec, save);
1501 	    jrecord_done(&jrec, 0);
1502 	}
1503     }
1504     return (error);
1505 }
1506 
1507 static
1508 int
1509 journal_nremove(struct vop_nremove_args *ap)
1510 {
1511     struct mount *mp;
1512     struct journal *jo;
1513     struct jrecord jrec;
1514     void *save;		/* warning, save pointers do not always remain valid */
1515     int error;
1516 
1517     error = vop_journal_operate_ap(&ap->a_head);
1518     mp = ap->a_head.a_ops->vv_mount;
1519     if (error == 0) {
1520 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1521 	    jrecord_init(jo, &jrec, -1);
1522 	    save = jrecord_push(&jrec, JTYPE_REMOVE);
1523 	    jrecord_pop(&jrec, save);
1524 	    jrecord_done(&jrec, 0);
1525 	}
1526     }
1527     return (error);
1528 }
1529 
1530 static
1531 int
1532 journal_nmkdir(struct vop_nmkdir_args *ap)
1533 {
1534     struct mount *mp;
1535     struct journal *jo;
1536     struct jrecord jrec;
1537     void *save;		/* warning, save pointers do not always remain valid */
1538     int error;
1539 
1540     error = vop_journal_operate_ap(&ap->a_head);
1541     mp = ap->a_head.a_ops->vv_mount;
1542     if (error == 0) {
1543 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1544 	    jrecord_init(jo, &jrec, -1);
1545 	    if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
1546 		save = jrecord_push(&jrec, JTYPE_UNDO);
1547 		/* XXX undo operations */
1548 		jrecord_pop(&jrec, save);
1549 	    }
1550 #if 0
1551 	    if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
1552 		jrecord_write_audit(&jrec);
1553 	    }
1554 #endif
1555 	    save = jrecord_push(&jrec, JTYPE_MKDIR);
1556 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1557 	    jrecord_write_vattr(&jrec, ap->a_vap);
1558 	    jrecord_pop(&jrec, save);
1559 	    jrecord_done(&jrec, 0);
1560 	}
1561     }
1562     return (error);
1563 }
1564 
1565 
1566 static
1567 int
1568 journal_nrmdir(struct vop_nrmdir_args *ap)
1569 {
1570     struct mount *mp;
1571     struct journal *jo;
1572     struct jrecord jrec;
1573     void *save;		/* warning, save pointers do not always remain valid */
1574     int error;
1575 
1576     error = vop_journal_operate_ap(&ap->a_head);
1577     mp = ap->a_head.a_ops->vv_mount;
1578     if (error == 0) {
1579 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1580 	    jrecord_init(jo, &jrec, -1);
1581 	    save = jrecord_push(&jrec, JTYPE_RMDIR);
1582 	    jrecord_pop(&jrec, save);
1583 	    jrecord_done(&jrec, 0);
1584 	}
1585     }
1586     return (error);
1587 }
1588 
1589 static
1590 int
1591 journal_nrename(struct vop_nrename_args *ap)
1592 {
1593     struct mount *mp;
1594     struct journal *jo;
1595     struct jrecord jrec;
1596     void *save;		/* warning, save pointers do not always remain valid */
1597     int error;
1598 
1599     error = vop_journal_operate_ap(&ap->a_head);
1600     mp = ap->a_head.a_ops->vv_mount;
1601     if (error == 0) {
1602 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1603 	    jrecord_init(jo, &jrec, -1);
1604 	    save = jrecord_push(&jrec, JTYPE_RENAME);
1605 	    jrecord_pop(&jrec, save);
1606 	    jrecord_done(&jrec, 0);
1607 	}
1608     }
1609     return (error);
1610 }
1611 
1612