xref: /dragonfly/sys/vfs/hammer/hammer_mirror.c (revision 60233e58)
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/vfs/hammer/hammer_mirror.c,v 1.17 2008/07/31 22:30:33 dillon Exp $
35  */
36 /*
37  * HAMMER mirroring ioctls - serialize and deserialize modifications made
38  *			     to a filesystem.
39  */
40 
41 #include "hammer.h"
42 
43 static int hammer_mirror_check(hammer_cursor_t cursor,
44 				struct hammer_ioc_mrecord_rec *mrec);
45 static int hammer_mirror_update(hammer_cursor_t cursor,
46 				struct hammer_ioc_mrecord_rec *mrec);
47 static int hammer_mirror_write(hammer_cursor_t cursor,
48 				struct hammer_ioc_mrecord_rec *mrec,
49 				char *udata);
50 static int hammer_ioc_mirror_write_rec(hammer_cursor_t cursor,
51 				struct hammer_ioc_mrecord_rec *mrec,
52 				struct hammer_ioc_mirror_rw *mirror,
53 				u_int32_t localization,
54 				char *uptr);
55 static int hammer_ioc_mirror_write_pass(hammer_cursor_t cursor,
56 				struct hammer_ioc_mrecord_rec *mrec,
57 				struct hammer_ioc_mirror_rw *mirror,
58 				u_int32_t localization);
59 static int hammer_ioc_mirror_write_skip(hammer_cursor_t cursor,
60 				struct hammer_ioc_mrecord_skip *mrec,
61 				struct hammer_ioc_mirror_rw *mirror,
62 				u_int32_t localization);
63 static int hammer_mirror_delete_to(hammer_cursor_t cursor,
64 			        struct hammer_ioc_mirror_rw *mirror);
65 static int hammer_mirror_localize_data(hammer_data_ondisk_t data,
66 				hammer_btree_leaf_elm_t leaf);
67 
68 /*
69  * All B-Tree records within the specified key range which also conform
70  * to the transaction id range are returned.  Mirroring code keeps track
71  * of the last transaction id fully scanned and can efficiently pick up
72  * where it left off if interrupted.
73  *
74  * The PFS is identified in the mirror structure.  The passed ip is just
75  * some directory in the overall HAMMER filesystem and has nothing to
76  * do with the PFS.
77  */
78 int
79 hammer_ioc_mirror_read(hammer_transaction_t trans, hammer_inode_t ip,
80 		       struct hammer_ioc_mirror_rw *mirror)
81 {
82 	struct hammer_cmirror cmirror;
83 	struct hammer_cursor cursor;
84 	union hammer_ioc_mrecord_any mrec;
85 	hammer_btree_leaf_elm_t elm;
86 	const int crc_start = HAMMER_MREC_CRCOFF;
87 	char *uptr;
88 	int error;
89 	int data_len;
90 	int bytes;
91 	int eatdisk;
92 	u_int32_t localization;
93 	u_int32_t rec_crc;
94 
95 	localization = (u_int32_t)mirror->pfs_id << 16;
96 
97 	if ((mirror->key_beg.localization | mirror->key_end.localization) &
98 	    HAMMER_LOCALIZE_PSEUDOFS_MASK) {
99 		return(EINVAL);
100 	}
101 	if (hammer_btree_cmp(&mirror->key_beg, &mirror->key_end) > 0)
102 		return(EINVAL);
103 
104 	mirror->key_cur = mirror->key_beg;
105 	mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
106 	mirror->key_cur.localization += localization;
107 	bzero(&mrec, sizeof(mrec));
108 	bzero(&cmirror, sizeof(cmirror));
109 
110 retry:
111 	error = hammer_init_cursor(trans, &cursor, NULL, NULL);
112 	if (error) {
113 		hammer_done_cursor(&cursor);
114 		goto failed;
115 	}
116 	cursor.key_beg = mirror->key_cur;
117 	cursor.key_end = mirror->key_end;
118 	cursor.key_end.localization &= HAMMER_LOCALIZE_MASK;
119 	cursor.key_end.localization += localization;
120 
121 	cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE;
122 	cursor.flags |= HAMMER_CURSOR_BACKEND;
123 
124 	/*
125 	 * This flag filters the search to only return elements whos create
126 	 * or delete TID is >= mirror_tid.  The B-Tree uses the mirror_tid
127 	 * field stored with internal and leaf nodes to shortcut the scan.
128 	 */
129 	cursor.flags |= HAMMER_CURSOR_MIRROR_FILTERED;
130 	cursor.cmirror = &cmirror;
131 	cmirror.mirror_tid = mirror->tid_beg;
132 
133 	error = hammer_btree_first(&cursor);
134 	while (error == 0) {
135 		/*
136 		 * Yield to more important tasks
137 		 */
138 		if (error == 0) {
139 			error = hammer_signal_check(trans->hmp);
140 			if (error)
141 				break;
142 		}
143 
144 		/*
145 		 * An internal node can be returned in mirror-filtered
146 		 * mode and indicates that the scan is returning a skip
147 		 * range in the cursor->cmirror structure.
148 		 */
149 		uptr = (char *)mirror->ubuf + mirror->count;
150 		if (cursor.node->ondisk->type == HAMMER_BTREE_TYPE_INTERNAL) {
151 			/*
152 			 * Check space
153 			 */
154 			mirror->key_cur = cmirror.skip_beg;
155 			bytes = sizeof(mrec.skip);
156 			if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) >
157 			    mirror->size) {
158 				break;
159 			}
160 
161 			/*
162 			 * Fill mrec
163 			 */
164 			mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
165 			mrec.head.type = HAMMER_MREC_TYPE_SKIP;
166 			mrec.head.rec_size = bytes;
167 			mrec.skip.skip_beg = cmirror.skip_beg;
168 			mrec.skip.skip_end = cmirror.skip_end;
169 			mrec.head.rec_crc = crc32(&mrec.head.rec_size,
170 						 bytes - crc_start);
171 			error = copyout(&mrec, uptr, bytes);
172 			eatdisk = 0;
173 			goto didwrite;
174 		}
175 
176 		/*
177 		 * Leaf node.  In full-history mode we could filter out
178 		 * elements modified outside the user-requested TID range.
179 		 *
180 		 * However, such elements must be returned so the writer
181 		 * can compare them against the target to detemrine what
182 		 * needs to be deleted on the target, particular for
183 		 * no-history mirrors.
184 		 */
185 		KKASSERT(cursor.node->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
186 		elm = &cursor.node->ondisk->elms[cursor.index].leaf;
187 		mirror->key_cur = elm->base;
188 
189 		/*
190 		 * Determine if we should generate a PASS or a REC.  PASS
191 		 * records are records without any data payload.  Such
192 		 * records will be generated if the target is already expected
193 		 * to have the record, allowing it to delete the gaps.
194 		 *
195 		 * A PASS record is also used to perform deletions on the
196 		 * target.
197 		 *
198 		 * Such deletions are needed if the master or files on the
199 		 * master are no-history, or if the slave is so far behind
200 		 * the master has already been pruned.
201 		 */
202 		if (elm->base.create_tid < mirror->tid_beg ||
203 		    elm->base.create_tid > mirror->tid_end) {
204 			bytes = sizeof(mrec.rec);
205 			if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) >
206 			    mirror->size) {
207 				break;
208 			}
209 
210 			/*
211 			 * Fill mrec.
212 			 */
213 			mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
214 			mrec.head.type = HAMMER_MREC_TYPE_PASS;
215 			mrec.head.rec_size = bytes;
216 			mrec.rec.leaf = *elm;
217 			mrec.head.rec_crc = crc32(&mrec.head.rec_size,
218 						 bytes - crc_start);
219 			error = copyout(&mrec, uptr, bytes);
220 			eatdisk = 1;
221 			goto didwrite;
222 
223 		}
224 
225 		/*
226 		 * The core code exports the data to userland.
227 		 */
228 		data_len = (elm->data_offset) ? elm->data_len : 0;
229 		if (data_len) {
230 			error = hammer_btree_extract(&cursor,
231 						     HAMMER_CURSOR_GET_DATA);
232 			if (error)
233 				break;
234 		}
235 
236 		bytes = sizeof(mrec.rec) + data_len;
237 		if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > mirror->size)
238 			break;
239 
240 		/*
241 		 * Construct the record for userland and copyout.
242 		 *
243 		 * The user is asking for a snapshot, if the record was
244 		 * deleted beyond the user-requested ending tid, the record
245 		 * is not considered deleted from the point of view of
246 		 * userland and delete_tid is cleared.
247 		 */
248 		mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
249 		mrec.head.type = HAMMER_MREC_TYPE_REC;
250 		mrec.head.rec_size = bytes;
251 		mrec.rec.leaf = *elm;
252 		if (elm->base.delete_tid > mirror->tid_end)
253 			mrec.rec.leaf.base.delete_tid = 0;
254 		rec_crc = crc32(&mrec.head.rec_size,
255 				sizeof(mrec.rec) - crc_start);
256 		if (data_len)
257 			rec_crc = crc32_ext(cursor.data, data_len, rec_crc);
258 		mrec.head.rec_crc = rec_crc;
259 		error = copyout(&mrec, uptr, sizeof(mrec.rec));
260 		if (data_len && error == 0) {
261 			error = copyout(cursor.data, uptr + sizeof(mrec.rec),
262 					data_len);
263 		}
264 		eatdisk = 1;
265 
266 		/*
267 		 * eatdisk controls whether we skip the current cursor
268 		 * position on the next scan or not.  If doing a SKIP
269 		 * the cursor is already positioned properly for the next
270 		 * scan and eatdisk will be 0.
271 		 */
272 didwrite:
273 		if (error == 0) {
274 			mirror->count += HAMMER_HEAD_DOALIGN(bytes);
275 			if (eatdisk)
276 				cursor.flags |= HAMMER_CURSOR_ATEDISK;
277 			else
278 				cursor.flags &= ~HAMMER_CURSOR_ATEDISK;
279 			error = hammer_btree_iterate(&cursor);
280 		}
281 	}
282 	if (error == ENOENT) {
283 		mirror->key_cur = mirror->key_end;
284 		error = 0;
285 	}
286 	hammer_done_cursor(&cursor);
287 	if (error == EDEADLK)
288 		goto retry;
289 	if (error == EINTR) {
290 		mirror->head.flags |= HAMMER_IOC_HEAD_INTR;
291 		error = 0;
292 	}
293 failed:
294 	mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
295 	return(error);
296 }
297 
298 /*
299  * Copy records from userland to the target mirror.
300  *
301  * The PFS is identified in the mirror structure.  The passed ip is just
302  * some directory in the overall HAMMER filesystem and has nothing to
303  * do with the PFS.  In fact, there might not even be a root directory for
304  * the PFS yet!
305  */
306 int
307 hammer_ioc_mirror_write(hammer_transaction_t trans, hammer_inode_t ip,
308 		       struct hammer_ioc_mirror_rw *mirror)
309 {
310 	union hammer_ioc_mrecord_any mrec;
311 	struct hammer_cursor cursor;
312 	u_int32_t localization;
313 	int checkspace_count = 0;
314 	int error;
315 	int bytes;
316 	char *uptr;
317 	int seq;
318 
319 	localization = (u_int32_t)mirror->pfs_id << 16;
320 	seq = trans->hmp->flusher.act;
321 
322 	/*
323 	 * Validate the mirror structure and relocalize the tracking keys.
324 	 */
325 	if (mirror->size < 0 || mirror->size > 0x70000000)
326 		return(EINVAL);
327 	mirror->key_beg.localization &= HAMMER_LOCALIZE_MASK;
328 	mirror->key_beg.localization += localization;
329 	mirror->key_end.localization &= HAMMER_LOCALIZE_MASK;
330 	mirror->key_end.localization += localization;
331 	mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
332 	mirror->key_cur.localization += localization;
333 
334 	/*
335 	 * Set up our tracking cursor for the loop.  The tracking cursor
336 	 * is used to delete records that are no longer present on the
337 	 * master.  The last handled record at key_cur must be skipped.
338 	 */
339 	error = hammer_init_cursor(trans, &cursor, NULL, NULL);
340 
341 	cursor.key_beg = mirror->key_cur;
342 	cursor.key_end = mirror->key_end;
343 	cursor.flags |= HAMMER_CURSOR_BACKEND;
344 	error = hammer_btree_first(&cursor);
345 	if (error == 0)
346 		cursor.flags |= HAMMER_CURSOR_ATEDISK;
347 	if (error == ENOENT)
348 		error = 0;
349 
350 	/*
351 	 * Loop until our input buffer has been exhausted.
352 	 */
353 	while (error == 0 &&
354 		mirror->count + sizeof(mrec.head) <= mirror->size) {
355 
356 	        /*
357 		 * Don't blow out the buffer cache.  Leave room for frontend
358 		 * cache as well.
359 		 */
360 		while (hammer_flusher_meta_halflimit(trans->hmp) ||
361 		       hammer_flusher_undo_exhausted(trans, 2)) {
362 			hammer_unlock_cursor(&cursor);
363 			hammer_flusher_wait(trans->hmp, seq);
364 			hammer_lock_cursor(&cursor);
365 			seq = hammer_flusher_async_one(trans->hmp);
366 		}
367 
368 		/*
369 		 * If there is insufficient free space it may be due to
370 		 * reserved bigblocks, which flushing might fix.
371 		 */
372 		if (hammer_checkspace(trans->hmp, HAMMER_CHKSPC_MIRROR)) {
373 			if (++checkspace_count == 10) {
374 				error = ENOSPC;
375 				break;
376 			}
377 			hammer_unlock_cursor(&cursor);
378 			hammer_flusher_wait(trans->hmp, seq);
379 			hammer_lock_cursor(&cursor);
380 			seq = hammer_flusher_async(trans->hmp, NULL);
381 		}
382 
383 
384 		/*
385 		 * Acquire and validate header
386 		 */
387 		if ((bytes = mirror->size - mirror->count) > sizeof(mrec))
388 			bytes = sizeof(mrec);
389 		uptr = (char *)mirror->ubuf + mirror->count;
390 		error = copyin(uptr, &mrec, bytes);
391 		if (error)
392 			break;
393 		if (mrec.head.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
394 			error = EINVAL;
395 			break;
396 		}
397 		if (mrec.head.rec_size < sizeof(mrec.head) ||
398 		    mrec.head.rec_size > sizeof(mrec) + HAMMER_XBUFSIZE ||
399 		    mirror->count + mrec.head.rec_size > mirror->size) {
400 			error = EINVAL;
401 			break;
402 		}
403 
404 		switch(mrec.head.type) {
405 		case HAMMER_MREC_TYPE_SKIP:
406 			if (mrec.head.rec_size != sizeof(mrec.skip))
407 				error = EINVAL;
408 			if (error == 0)
409 				error = hammer_ioc_mirror_write_skip(&cursor, &mrec.skip, mirror, localization);
410 			break;
411 		case HAMMER_MREC_TYPE_REC:
412 			if (mrec.head.rec_size < sizeof(mrec.rec))
413 				error = EINVAL;
414 			if (error == 0)
415 				error = hammer_ioc_mirror_write_rec(&cursor, &mrec.rec, mirror, localization, uptr + sizeof(mrec.rec));
416 			break;
417 		case HAMMER_MREC_TYPE_PASS:
418 			if (mrec.head.rec_size != sizeof(mrec.rec))
419 				error = EINVAL;
420 			if (error == 0)
421 				error = hammer_ioc_mirror_write_pass(&cursor, &mrec.rec, mirror, localization);
422 			break;
423 		default:
424 			error = EINVAL;
425 			break;
426 		}
427 
428 		/*
429 		 * Retry the current record on deadlock, otherwise setup
430 		 * for the next loop.
431 		 */
432 		if (error == EDEADLK) {
433 			while (error == EDEADLK) {
434 				hammer_recover_cursor(&cursor);
435 				error = hammer_cursor_upgrade(&cursor);
436 			}
437 		} else {
438 			if (error == EALREADY)
439 				error = 0;
440 			if (error == 0) {
441 				mirror->count +=
442 					HAMMER_HEAD_DOALIGN(mrec.head.rec_size);
443 			}
444 		}
445 	}
446 	hammer_done_cursor(&cursor);
447 
448 	/*
449 	 * cumulative error
450 	 */
451 	if (error) {
452 		mirror->head.flags |= HAMMER_IOC_HEAD_ERROR;
453 		mirror->head.error = error;
454 	}
455 
456 	/*
457 	 * ioctls don't update the RW data structure if an error is returned,
458 	 * always return 0.
459 	 */
460 	return(0);
461 }
462 
463 /*
464  * Handle skip records.
465  *
466  * We must iterate from the last resolved record position at mirror->key_cur
467  * to skip_beg and delete any records encountered.
468  *
469  * mirror->key_cur must be carefully set when we succeed in processing
470  * this mrec.
471  */
472 static int
473 hammer_ioc_mirror_write_skip(hammer_cursor_t cursor,
474 			     struct hammer_ioc_mrecord_skip *mrec,
475 			     struct hammer_ioc_mirror_rw *mirror,
476 			     u_int32_t localization)
477 {
478 	int error;
479 
480 	/*
481 	 * Relocalize the skip range
482 	 */
483 	mrec->skip_beg.localization &= HAMMER_LOCALIZE_MASK;
484 	mrec->skip_beg.localization += localization;
485 	mrec->skip_end.localization &= HAMMER_LOCALIZE_MASK;
486 	mrec->skip_end.localization += localization;
487 
488 	/*
489 	 * Iterate from current position to skip_beg, deleting any records
490 	 * we encounter.
491 	 */
492 	cursor->key_end = mrec->skip_beg;
493 	cursor->flags |= HAMMER_CURSOR_BACKEND;
494 	error = hammer_mirror_delete_to(cursor, mirror);
495 
496 	/*
497 	 * Now skip past the skip (which is the whole point point of
498 	 * having a skip record).  The sender has not sent us any records
499 	 * for the skip area so we wouldn't know what to keep and what
500 	 * to delete anyway.
501 	 *
502 	 * Clear ATEDISK because skip_end is non-inclusive, so we can't
503 	 * count an exact match if we happened to get one.
504 	 */
505 	if (error == 0) {
506 		mirror->key_cur = mrec->skip_end;
507 		cursor->key_beg = mrec->skip_end;
508 		error = hammer_btree_lookup(cursor);
509 		cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
510 		if (error == ENOENT)
511 			error = 0;
512 	}
513 	return(error);
514 }
515 
516 /*
517  * Handle B-Tree records.
518  *
519  * We must iterate to mrec->base.key (non-inclusively), and then process
520  * the record.  We are allowed to write a new record or delete an existing
521  * record, but cannot replace an existing record.
522  *
523  * mirror->key_cur must be carefully set when we succeed in processing
524  * this mrec.
525  */
526 static int
527 hammer_ioc_mirror_write_rec(hammer_cursor_t cursor,
528 			    struct hammer_ioc_mrecord_rec *mrec,
529 			    struct hammer_ioc_mirror_rw *mirror,
530 			    u_int32_t localization,
531 			    char *uptr)
532 {
533 	hammer_transaction_t trans;
534 	u_int32_t rec_crc;
535 	int error;
536 
537 	trans = cursor->trans;
538 	rec_crc = crc32(mrec, sizeof(*mrec));
539 
540 	if (mrec->leaf.data_len < 0 ||
541 	    mrec->leaf.data_len > HAMMER_XBUFSIZE ||
542 	    mrec->leaf.data_len + sizeof(*mrec) > mrec->head.rec_size) {
543 		return(EINVAL);
544 	}
545 
546 	/*
547 	 * Re-localize for target.  relocalization of data is handled
548 	 * by hammer_mirror_write().
549 	 */
550 	mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK;
551 	mrec->leaf.base.localization += localization;
552 
553 	/*
554 	 * Delete records through until we reach (non-inclusively) the
555 	 * target record.
556 	 */
557 	cursor->key_end = mrec->leaf.base;
558 	cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE;
559 	cursor->flags |= HAMMER_CURSOR_BACKEND;
560 	error = hammer_mirror_delete_to(cursor, mirror);
561 
562 	/*
563 	 * Locate the record.
564 	 *
565 	 * If the record exists only the delete_tid may be updated.
566 	 *
567 	 * If the record does not exist we can create it only if the
568 	 * create_tid is not too old.  If the create_tid is too old
569 	 * it may have already been destroyed on the slave from pruning.
570 	 *
571 	 * Note that mirror operations are effectively as-of operations
572 	 * and delete_tid can be 0 for mirroring purposes even if it is
573 	 * not actually 0 at the originator.
574 	 *
575 	 * These functions can return EDEADLK
576 	 */
577 	cursor->key_beg = mrec->leaf.base;
578 	cursor->flags |= HAMMER_CURSOR_BACKEND;
579 	cursor->flags &= ~HAMMER_CURSOR_INSERT;
580 	error = hammer_btree_lookup(cursor);
581 
582 	if (error == 0 && hammer_mirror_check(cursor, mrec)) {
583 		error = hammer_mirror_update(cursor, mrec);
584 	} else if (error == ENOENT) {
585 		if (mrec->leaf.base.create_tid >= mirror->tid_beg)
586 			error = hammer_mirror_write(cursor, mrec, uptr);
587 		else
588 			error = 0;
589 	}
590 	if (error == 0 || error == EALREADY)
591 		mirror->key_cur = mrec->leaf.base;
592 	return(error);
593 }
594 
595 /*
596  * This works like write_rec but no write or update is necessary,
597  * and no data payload is included so we couldn't do a write even
598  * if we wanted to.
599  *
600  * We must still iterate for deletions, and we can validate the
601  * record header which is a good way to test for corrupted mirror
602  * targets XXX.
603  *
604  * mirror->key_cur must be carefully set when we succeed in processing
605  * this mrec.
606  */
607 static
608 int
609 hammer_ioc_mirror_write_pass(hammer_cursor_t cursor,
610 			     struct hammer_ioc_mrecord_rec *mrec,
611 			     struct hammer_ioc_mirror_rw *mirror,
612 			     u_int32_t localization)
613 {
614 	hammer_transaction_t trans;
615 	u_int32_t rec_crc;
616 	int error;
617 
618 	trans = cursor->trans;
619 	rec_crc = crc32(mrec, sizeof(*mrec));
620 
621 	/*
622 	 * Re-localize for target.  Relocalization of data is handled
623 	 * by hammer_mirror_write().
624 	 */
625 	mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK;
626 	mrec->leaf.base.localization += localization;
627 
628 	/*
629 	 * Delete records through until we reach (non-inclusively) the
630 	 * target record.
631 	 */
632 	cursor->key_end = mrec->leaf.base;
633 	cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE;
634 	cursor->flags |= HAMMER_CURSOR_BACKEND;
635 
636 	error = hammer_mirror_delete_to(cursor, mirror);
637 
638 	/*
639 	 * Locate the record and get past it by setting ATEDISK.  Perform
640 	 * any necessary deletions.  We have no data payload and cannot
641 	 * create a new record.
642 	 */
643 	if (error == 0) {
644 		mirror->key_cur = mrec->leaf.base;
645 		cursor->key_beg = mrec->leaf.base;
646 		cursor->flags |= HAMMER_CURSOR_BACKEND;
647 		cursor->flags &= ~HAMMER_CURSOR_INSERT;
648 		error = hammer_btree_lookup(cursor);
649 		if (error == 0) {
650 			if (hammer_mirror_check(cursor, mrec))
651 				error = hammer_mirror_update(cursor, mrec);
652 			cursor->flags |= HAMMER_CURSOR_ATEDISK;
653 		} else {
654 			cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
655 		}
656 		if (error == ENOENT)
657 			error = 0;
658 	}
659 	return(error);
660 }
661 
662 /*
663  * As part of the mirror write we iterate across swaths of records
664  * on the target which no longer exist on the source, and mark them
665  * deleted.
666  *
667  * The caller has indexed the cursor and set up key_end.  We iterate
668  * through to key_end.
669  */
670 static
671 int
672 hammer_mirror_delete_to(hammer_cursor_t cursor,
673 		       struct hammer_ioc_mirror_rw *mirror)
674 {
675 	hammer_btree_leaf_elm_t elm;
676 	int error;
677 
678 	error = hammer_btree_iterate(cursor);
679 	while (error == 0) {
680 		elm = &cursor->node->ondisk->elms[cursor->index].leaf;
681 		KKASSERT(elm->base.btype == HAMMER_BTREE_TYPE_RECORD);
682 		cursor->flags |= HAMMER_CURSOR_ATEDISK;
683 		if (elm->base.delete_tid == 0) {
684 			error = hammer_delete_at_cursor(cursor,
685 							HAMMER_DELETE_ADJUST,
686 							mirror->tid_end,
687 							time_second,
688 							1, NULL);
689 		}
690 		if (error == 0)
691 			error = hammer_btree_iterate(cursor);
692 	}
693 	if (error == ENOENT)
694 		error = 0;
695 	return(error);
696 }
697 
698 /*
699  * Check whether an update is needed in the case where a match already
700  * exists on the target.  The only type of update allowed in this case
701  * is an update of the delete_tid.
702  *
703  * Return non-zero if the update should proceed.
704  */
705 static
706 int
707 hammer_mirror_check(hammer_cursor_t cursor, struct hammer_ioc_mrecord_rec *mrec)
708 {
709 	hammer_btree_leaf_elm_t leaf = cursor->leaf;
710 
711 	if (leaf->base.delete_tid != mrec->leaf.base.delete_tid) {
712 		if (mrec->leaf.base.delete_tid != 0)
713 			return(1);
714 	}
715 	return(0);
716 }
717 
718 /*
719  * Update a record in-place.  Only the delete_tid can change, and
720  * only from zero to non-zero.
721  */
722 static
723 int
724 hammer_mirror_update(hammer_cursor_t cursor,
725 		     struct hammer_ioc_mrecord_rec *mrec)
726 {
727 	int error;
728 
729 	/*
730 	 * This case shouldn't occur.
731 	 */
732 	if (mrec->leaf.base.delete_tid == 0)
733 		return(0);
734 
735 	/*
736 	 * Mark the record deleted on the mirror target.
737 	 */
738 	error = hammer_delete_at_cursor(cursor, HAMMER_DELETE_ADJUST,
739 					mrec->leaf.base.delete_tid,
740 					mrec->leaf.delete_ts,
741 					1, NULL);
742 	cursor->flags |= HAMMER_CURSOR_ATEDISK;
743 	return(error);
744 }
745 
746 /*
747  * Write out a new record.
748  */
749 static
750 int
751 hammer_mirror_write(hammer_cursor_t cursor,
752 		    struct hammer_ioc_mrecord_rec *mrec,
753 		    char *udata)
754 {
755 	hammer_transaction_t trans;
756 	hammer_buffer_t data_buffer;
757 	hammer_off_t ndata_offset;
758 	hammer_tid_t high_tid;
759 	void *ndata;
760 	int error;
761 	int doprop;
762 
763 	trans = cursor->trans;
764 	data_buffer = NULL;
765 
766 	/*
767 	 * Get the sync lock so the whole mess is atomic
768 	 */
769 	hammer_sync_lock_sh(trans);
770 
771 	/*
772 	 * Allocate and adjust data
773 	 */
774 	if (mrec->leaf.data_len && mrec->leaf.data_offset) {
775 		ndata = hammer_alloc_data(trans, mrec->leaf.data_len,
776 					  mrec->leaf.base.rec_type,
777 					  &ndata_offset, &data_buffer, &error);
778 		if (ndata == NULL)
779 			return(error);
780 		mrec->leaf.data_offset = ndata_offset;
781 		hammer_modify_buffer(trans, data_buffer, NULL, 0);
782 		error = copyin(udata, ndata, mrec->leaf.data_len);
783 		if (error == 0) {
784 			if (hammer_crc_test_leaf(ndata, &mrec->leaf) == 0) {
785 				kprintf("data crc mismatch on pipe\n");
786 				error = EINVAL;
787 			} else {
788 				error = hammer_mirror_localize_data(
789 							ndata, &mrec->leaf);
790 			}
791 		}
792 		hammer_modify_buffer_done(data_buffer);
793 	} else {
794 		mrec->leaf.data_offset = 0;
795 		error = 0;
796 		ndata = NULL;
797 	}
798 	if (error)
799 		goto failed;
800 
801 	/*
802 	 * Do the insertion.  This can fail with a EDEADLK or EALREADY
803 	 */
804 	cursor->flags |= HAMMER_CURSOR_INSERT;
805 	error = hammer_btree_lookup(cursor);
806 	if (error != ENOENT) {
807 		if (error == 0)
808 			error = EALREADY;
809 		goto failed;
810 	}
811 
812 	error = hammer_btree_insert(cursor, &mrec->leaf, &doprop);
813 
814 	/*
815 	 * Cursor is left on the current element, we want to skip it now.
816 	 */
817 	cursor->flags |= HAMMER_CURSOR_ATEDISK;
818 	cursor->flags &= ~HAMMER_CURSOR_INSERT;
819 
820 	/*
821 	 * Track a count of active inodes.
822 	 */
823 	if (error == 0 &&
824 	    mrec->leaf.base.rec_type == HAMMER_RECTYPE_INODE &&
825 	    mrec->leaf.base.delete_tid == 0) {
826 		hammer_modify_volume_field(trans,
827 					   trans->rootvol,
828 					   vol0_stat_inodes);
829 		++trans->hmp->rootvol->ondisk->vol0_stat_inodes;
830 		hammer_modify_volume_done(trans->rootvol);
831 	}
832 
833 	/*
834 	 * vol0_next_tid must track the highest TID stored in the filesystem.
835 	 * We do not need to generate undo for this update.
836 	 */
837 	high_tid = mrec->leaf.base.create_tid;
838 	if (high_tid < mrec->leaf.base.delete_tid)
839 		high_tid = mrec->leaf.base.delete_tid;
840 	if (trans->rootvol->ondisk->vol0_next_tid < high_tid) {
841 		hammer_modify_volume(trans, trans->rootvol, NULL, 0);
842 		trans->rootvol->ondisk->vol0_next_tid = high_tid;
843 		hammer_modify_volume_done(trans->rootvol);
844 	}
845 
846 	if (error == 0 && doprop)
847 		hammer_btree_do_propagation(cursor, NULL, &mrec->leaf);
848 
849 failed:
850 	/*
851 	 * Cleanup
852 	 */
853 	if (error && mrec->leaf.data_offset) {
854 		hammer_blockmap_free(cursor->trans,
855 				     mrec->leaf.data_offset,
856 				     mrec->leaf.data_len);
857 	}
858 	hammer_sync_unlock(trans);
859 	if (data_buffer)
860 		hammer_rel_buffer(data_buffer, 0);
861 	return(error);
862 }
863 
864 /*
865  * Localize the data payload.  Directory entries may need their
866  * localization adjusted.
867  *
868  * PFS directory entries must be skipped entirely (return EALREADY).
869  */
870 static
871 int
872 hammer_mirror_localize_data(hammer_data_ondisk_t data,
873 			    hammer_btree_leaf_elm_t leaf)
874 {
875 	u_int32_t localization;
876 
877 	if (leaf->base.rec_type == HAMMER_RECTYPE_DIRENTRY) {
878 		if (data->entry.obj_id == HAMMER_OBJID_ROOT)
879 			return(EALREADY);
880 		localization = leaf->base.localization &
881 			       HAMMER_LOCALIZE_PSEUDOFS_MASK;
882 		if (data->entry.localization != localization) {
883 			data->entry.localization = localization;
884 			hammer_crc_set_leaf(data, leaf);
885 		}
886 	}
887 	return(0);
888 }
889 
890