1  /*
2    Unix SMB/CIFS implementation.
3 
4    trivial database library
5 
6    Copyright (C) Andrew Tridgell              2005
7 
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11 
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16 
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21 
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25 
26 #include "tdb_private.h"
27 
28 /*
29   transaction design:
30 
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35 
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40 
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45 
46   - during transactions, keep a linked list of all writes that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51 
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55 
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58 
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62 
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66 
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70 
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77 
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83 
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88 
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94 
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97 
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105 
106 
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111 	/* we keep a mirrored copy of the tdb hash heads here so
112 	   tdb_next_hash_chain() can operate efficiently */
113 	uint32_t *hash_heads;
114 
115 	/* the original io methods - used to do IOs to the real db */
116 	const struct tdb_methods *io_methods;
117 
118 	/* the list of transaction blocks. When a block is first
119 	   written to, it gets created in this list */
120 	uint8_t **blocks;
121 	uint32_t num_blocks;
122 	uint32_t block_size;      /* bytes in each block */
123 	uint32_t last_block_size; /* number of valid bytes in the last block */
124 
125 	/* non-zero when an internal transaction error has
126 	   occurred. All write operations will then fail until the
127 	   transaction is ended */
128 	int transaction_error;
129 
130 	/* when inside a transaction we need to keep track of any
131 	   nested tdb_transaction_start() calls, as these are allowed,
132 	   but don't create a new transaction */
133 	int nesting;
134 
135 	/* set when a prepare has already occurred */
136 	bool prepared;
137 	tdb_off_t magic_offset;
138 
139 	/* old file size before transaction */
140 	tdb_len_t old_map_size;
141 
142 	/* did we expand in this transaction */
143 	bool expanded;
144 };
145 
146 
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
transaction_read(struct tdb_context * tdb,tdb_off_t off,void * buf,tdb_len_t len,int cv)151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152 			    tdb_len_t len, int cv)
153 {
154 	uint32_t blk;
155 
156 	/* break it down into block sized ops */
157 	while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158 		tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159 		if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160 			return -1;
161 		}
162 		len -= len2;
163 		off += len2;
164 		buf = (void *)(len2 + (char *)buf);
165 	}
166 
167 	if (len == 0) {
168 		return 0;
169 	}
170 
171 	blk = off / tdb->transaction->block_size;
172 
173 	/* see if we have it in the block list */
174 	if (tdb->transaction->num_blocks <= blk ||
175 	    tdb->transaction->blocks[blk] == NULL) {
176 		/* nope, do a real read */
177 		if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178 			goto fail;
179 		}
180 		return 0;
181 	}
182 
183 	/* it is in the block list. Now check for the last block */
184 	if (blk == tdb->transaction->num_blocks-1) {
185 		if (len > tdb->transaction->last_block_size) {
186 			goto fail;
187 		}
188 	}
189 
190 	/* now copy it out of this block */
191 	memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192 	if (cv) {
193 		tdb_convert(buf, len);
194 	}
195 	return 0;
196 
197 fail:
198 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199 	tdb->ecode = TDB_ERR_IO;
200 	tdb->transaction->transaction_error = 1;
201 	return -1;
202 }
203 
204 
205 /*
206   write while in a transaction
207 */
transaction_write(struct tdb_context * tdb,tdb_off_t off,const void * buf,tdb_len_t len)208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209 			     const void *buf, tdb_len_t len)
210 {
211 	uint32_t blk;
212 
213 	if (buf == NULL) {
214 		return -1;
215 	}
216 
217 	/* Only a commit is allowed on a prepared transaction */
218 	if (tdb->transaction->prepared) {
219 		tdb->ecode = TDB_ERR_EINVAL;
220 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
221 		tdb->transaction->transaction_error = 1;
222 		return -1;
223 	}
224 
225 	/* if the write is to a hash head, then update the transaction
226 	   hash heads */
227 	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
228 	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
229 		uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
230 		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
231 	}
232 
233 	/* break it up into block sized chunks */
234 	while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
235 		tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
236 		if (transaction_write(tdb, off, buf, len2) != 0) {
237 			return -1;
238 		}
239 		len -= len2;
240 		off += len2;
241 		buf = (const void *)(len2 + (const char *)buf);
242 	}
243 
244 	if (len == 0) {
245 		return 0;
246 	}
247 
248 	blk = off / tdb->transaction->block_size;
249 	off = off % tdb->transaction->block_size;
250 
251 	if (tdb->transaction->num_blocks <= blk) {
252 		uint8_t **new_blocks;
253 		/* expand the blocks array */
254 		new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
255 						 (blk+1)*sizeof(uint8_t *));
256 		if (new_blocks == NULL) {
257 			tdb->ecode = TDB_ERR_OOM;
258 			goto fail;
259 		}
260 		memset(&new_blocks[tdb->transaction->num_blocks], 0,
261 		       (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
262 		tdb->transaction->blocks = new_blocks;
263 		tdb->transaction->num_blocks = blk+1;
264 		tdb->transaction->last_block_size = 0;
265 	}
266 
267 	/* allocate and fill a block? */
268 	if (tdb->transaction->blocks[blk] == NULL) {
269 		tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
270 		if (tdb->transaction->blocks[blk] == NULL) {
271 			tdb->ecode = TDB_ERR_OOM;
272 			tdb->transaction->transaction_error = 1;
273 			return -1;
274 		}
275 		if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
276 			tdb_len_t len2 = tdb->transaction->block_size;
277 			if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
278 				len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
279 			}
280 			if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
281 								   tdb->transaction->blocks[blk],
282 								   len2, 0) != 0) {
283 				SAFE_FREE(tdb->transaction->blocks[blk]);
284 				tdb->ecode = TDB_ERR_IO;
285 				goto fail;
286 			}
287 			if (blk == tdb->transaction->num_blocks-1) {
288 				tdb->transaction->last_block_size = len2;
289 			}
290 		}
291 	}
292 
293 	/* overwrite part of an existing block */
294 	memcpy(tdb->transaction->blocks[blk] + off, buf, len);
295 	if (blk == tdb->transaction->num_blocks-1) {
296 		if (len + off > tdb->transaction->last_block_size) {
297 			tdb->transaction->last_block_size = len + off;
298 		}
299 	}
300 
301 	return 0;
302 
303 fail:
304 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
305 		 (blk*tdb->transaction->block_size) + off, len));
306 	tdb->transaction->transaction_error = 1;
307 	return -1;
308 }
309 
310 
311 /*
312   write while in a transaction - this variant never expands the transaction blocks, it only
313   updates existing blocks. This means it cannot change the recovery size
314 */
transaction_write_existing(struct tdb_context * tdb,tdb_off_t off,const void * buf,tdb_len_t len)315 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
316 				      const void *buf, tdb_len_t len)
317 {
318 	uint32_t blk;
319 
320 	/* break it up into block sized chunks */
321 	while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
322 		tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
323 		if (transaction_write_existing(tdb, off, buf, len2) != 0) {
324 			return -1;
325 		}
326 		len -= len2;
327 		off += len2;
328 		if (buf != NULL) {
329 			buf = (const void *)(len2 + (const char *)buf);
330 		}
331 	}
332 
333 	if (len == 0 || buf == NULL) {
334 		return 0;
335 	}
336 
337 	blk = off / tdb->transaction->block_size;
338 	off = off % tdb->transaction->block_size;
339 
340 	if (tdb->transaction->num_blocks <= blk ||
341 	    tdb->transaction->blocks[blk] == NULL) {
342 		return 0;
343 	}
344 
345 	if (blk == tdb->transaction->num_blocks-1 &&
346 	    off + len > tdb->transaction->last_block_size) {
347 		if (off >= tdb->transaction->last_block_size) {
348 			return 0;
349 		}
350 		len = tdb->transaction->last_block_size - off;
351 	}
352 
353 	/* overwrite part of an existing block */
354 	memcpy(tdb->transaction->blocks[blk] + off, buf, len);
355 
356 	return 0;
357 }
358 
359 
360 /*
361   accelerated hash chain head search, using the cached hash heads
362 */
transaction_next_hash_chain(struct tdb_context * tdb,uint32_t * chain)363 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
364 {
365 	uint32_t h = *chain;
366 	for (;h < tdb->hash_size;h++) {
367 		/* the +1 takes account of the freelist */
368 		if (0 != tdb->transaction->hash_heads[h+1]) {
369 			break;
370 		}
371 	}
372 	(*chain) = h;
373 }
374 
375 /*
376   out of bounds check during a transaction
377 */
transaction_oob(struct tdb_context * tdb,tdb_off_t off,tdb_len_t len,int probe)378 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
379 			   tdb_len_t len, int probe)
380 {
381 	/*
382 	 * This duplicates functionality from tdb_oob(). Don't remove:
383 	 * we still have direct callers of tdb->methods->tdb_oob()
384 	 * inside transaction.c.
385 	 */
386 	if (off + len >= off && off + len <= tdb->map_size) {
387 		return 0;
388 	}
389 	tdb->ecode = TDB_ERR_IO;
390 	return -1;
391 }
392 
393 /*
394   transaction version of tdb_expand().
395 */
transaction_expand_file(struct tdb_context * tdb,tdb_off_t size,tdb_off_t addition)396 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
397 				   tdb_off_t addition)
398 {
399 	const char buf_zero[8192] = {0};
400 	size_t buf_len = sizeof(buf_zero);
401 
402 	while (addition > 0) {
403 		size_t n = MIN(addition, buf_len);
404 		int ret;
405 
406 		ret = transaction_write(tdb, size, buf_zero, n);
407 		if (ret != 0) {
408 			return ret;
409 		}
410 
411 		addition -= n;
412 		size += n;
413 	}
414 
415 	tdb->transaction->expanded = true;
416 
417 	return 0;
418 }
419 
420 static const struct tdb_methods transaction_methods = {
421 	transaction_read,
422 	transaction_write,
423 	transaction_next_hash_chain,
424 	transaction_oob,
425 	transaction_expand_file,
426 };
427 
428 /*
429  * Is a transaction currently active on this context?
430  *
431  */
tdb_transaction_active(struct tdb_context * tdb)432 _PUBLIC_ bool tdb_transaction_active(struct tdb_context *tdb)
433 {
434 	return (tdb->transaction != NULL);
435 }
436 
437 /*
438   start a tdb transaction. No token is returned, as only a single
439   transaction is allowed to be pending per tdb_context
440 */
_tdb_transaction_start(struct tdb_context * tdb,enum tdb_lock_flags lockflags)441 static int _tdb_transaction_start(struct tdb_context *tdb,
442 				  enum tdb_lock_flags lockflags)
443 {
444 	/* some sanity checks */
445 	if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
446 	    || tdb->traverse_read) {
447 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
448 		tdb->ecode = TDB_ERR_EINVAL;
449 		return -1;
450 	}
451 
452 	/* cope with nested tdb_transaction_start() calls */
453 	if (tdb->transaction != NULL) {
454 		if (!(tdb->flags & TDB_ALLOW_NESTING)) {
455 			tdb->ecode = TDB_ERR_NESTING;
456 			return -1;
457 		}
458 		tdb->transaction->nesting++;
459 		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
460 			 tdb->transaction->nesting));
461 		return 0;
462 	}
463 
464 	if (tdb_have_extra_locks(tdb)) {
465 		/* the caller must not have any locks when starting a
466 		   transaction as otherwise we'll be screwed by lack
467 		   of nested locks in posix */
468 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
469 		tdb->ecode = TDB_ERR_LOCK;
470 		return -1;
471 	}
472 
473 	if (tdb->travlocks.next != NULL) {
474 		/* you cannot use transactions inside a traverse (although you can use
475 		   traverse inside a transaction) as otherwise you can end up with
476 		   deadlock */
477 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
478 		tdb->ecode = TDB_ERR_LOCK;
479 		return -1;
480 	}
481 
482 	tdb->transaction = (struct tdb_transaction *)
483 		calloc(sizeof(struct tdb_transaction), 1);
484 	if (tdb->transaction == NULL) {
485 		tdb->ecode = TDB_ERR_OOM;
486 		return -1;
487 	}
488 
489 	/* a page at a time seems like a reasonable compromise between compactness and efficiency */
490 	tdb->transaction->block_size = tdb->page_size;
491 
492 	/* get the transaction write lock. This is a blocking lock. As
493 	   discussed with Volker, there are a number of ways we could
494 	   make this async, which we will probably do in the future */
495 	if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
496 		SAFE_FREE(tdb->transaction->blocks);
497 		SAFE_FREE(tdb->transaction);
498 		if ((lockflags & TDB_LOCK_WAIT) == 0) {
499 			tdb->ecode = TDB_ERR_NOLOCK;
500 		} else {
501 			TDB_LOG((tdb, TDB_DEBUG_ERROR,
502 				 "tdb_transaction_start: "
503 				 "failed to get transaction lock\n"));
504 		}
505 		return -1;
506 	}
507 
508 	/* get a read lock from the freelist to the end of file. This
509 	   is upgraded to a write lock during the commit */
510 	if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
511 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
512 		goto fail_allrecord_lock;
513 	}
514 
515 	/* setup a copy of the hash table heads so the hash scan in
516 	   traverse can be fast */
517 	tdb->transaction->hash_heads = (uint32_t *)
518 		calloc(tdb->hash_size+1, sizeof(uint32_t));
519 	if (tdb->transaction->hash_heads == NULL) {
520 		tdb->ecode = TDB_ERR_OOM;
521 		goto fail;
522 	}
523 	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
524 				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
525 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
526 		tdb->ecode = TDB_ERR_IO;
527 		goto fail;
528 	}
529 
530 	/* make sure we know about any file expansions already done by
531 	   anyone else */
532 	tdb_oob(tdb, tdb->map_size, 1, 1);
533 	tdb->transaction->old_map_size = tdb->map_size;
534 
535 	/* finally hook the io methods, replacing them with
536 	   transaction specific methods */
537 	tdb->transaction->io_methods = tdb->methods;
538 	tdb->methods = &transaction_methods;
539 
540 	/* Trace at the end, so we get sequence number correct. */
541 	tdb_trace(tdb, "tdb_transaction_start");
542 	return 0;
543 
544 fail:
545 	tdb_allrecord_unlock(tdb, F_RDLCK, false);
546 fail_allrecord_lock:
547 	tdb_transaction_unlock(tdb, F_WRLCK);
548 	SAFE_FREE(tdb->transaction->blocks);
549 	SAFE_FREE(tdb->transaction->hash_heads);
550 	SAFE_FREE(tdb->transaction);
551 	return -1;
552 }
553 
tdb_transaction_start(struct tdb_context * tdb)554 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
555 {
556 	return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
557 }
558 
tdb_transaction_start_nonblock(struct tdb_context * tdb)559 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
560 {
561 	return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
562 }
563 
564 /*
565   sync to disk
566 */
transaction_sync(struct tdb_context * tdb,tdb_off_t offset,tdb_len_t length)567 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
568 {
569 	if (tdb->flags & TDB_NOSYNC) {
570 		return 0;
571 	}
572 
573 #ifdef HAVE_FDATASYNC
574 	if (fdatasync(tdb->fd) != 0) {
575 #else
576 	if (fsync(tdb->fd) != 0) {
577 #endif
578 		tdb->ecode = TDB_ERR_IO;
579 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
580 		return -1;
581 	}
582 #ifdef HAVE_MMAP
583 	if (tdb->map_ptr) {
584 		tdb_off_t moffset = offset & ~(tdb->page_size-1);
585 		if (msync(moffset + (char *)tdb->map_ptr,
586 			  length + (offset - moffset), MS_SYNC) != 0) {
587 			tdb->ecode = TDB_ERR_IO;
588 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
589 				 strerror(errno)));
590 			return -1;
591 		}
592 	}
593 #endif
594 	return 0;
595 }
596 
597 
598 static int _tdb_transaction_cancel(struct tdb_context *tdb)
599 {
600 	uint32_t i;
601 	int ret = 0;
602 
603 	if (tdb->transaction == NULL) {
604 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
605 		return -1;
606 	}
607 
608 	if (tdb->transaction->nesting != 0) {
609 		tdb->transaction->transaction_error = 1;
610 		tdb->transaction->nesting--;
611 		return 0;
612 	}
613 
614 	tdb->map_size = tdb->transaction->old_map_size;
615 
616 	/* free all the transaction blocks */
617 	for (i=0;i<tdb->transaction->num_blocks;i++) {
618 		if ((tdb->transaction->blocks != NULL) &&
619 		    tdb->transaction->blocks[i] != NULL) {
620 			free(tdb->transaction->blocks[i]);
621 		}
622 	}
623 	SAFE_FREE(tdb->transaction->blocks);
624 
625 	if (tdb->transaction->magic_offset) {
626 		const struct tdb_methods *methods = tdb->transaction->io_methods;
627 		const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
628 
629 		/* remove the recovery marker */
630 		if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
631 		transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
632 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
633 			ret = -1;
634 		}
635 	}
636 
637 	/* This also removes the OPEN_LOCK, if we have it. */
638 	tdb_release_transaction_locks(tdb);
639 
640 	/* restore the normal io methods */
641 	tdb->methods = tdb->transaction->io_methods;
642 
643 	SAFE_FREE(tdb->transaction->hash_heads);
644 	SAFE_FREE(tdb->transaction);
645 
646 	return ret;
647 }
648 
649 /*
650   cancel the current transaction
651 */
652 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
653 {
654 	tdb_trace(tdb, "tdb_transaction_cancel");
655 	return _tdb_transaction_cancel(tdb);
656 }
657 
658 /*
659   work out how much space the linearised recovery data will consume
660 */
661 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
662 {
663 	tdb_len_t recovery_size = 0;
664 	uint32_t i;
665 
666 	recovery_size = sizeof(uint32_t);
667 	for (i=0;i<tdb->transaction->num_blocks;i++) {
668 		tdb_len_t block_size;
669 		if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
670 			break;
671 		}
672 		if (tdb->transaction->blocks[i] == NULL) {
673 			continue;
674 		}
675 		if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
676 				   &recovery_size)) {
677 			return false;
678 		}
679 		if (i == tdb->transaction->num_blocks-1) {
680 			block_size = tdb->transaction->last_block_size;
681 		} else {
682 			block_size =  tdb->transaction->block_size;
683 		}
684 		if (!tdb_add_len_t(recovery_size, block_size,
685 				   &recovery_size)) {
686 			return false;
687 		}
688 	}
689 
690 	*result = recovery_size;
691 	return true;
692 }
693 
694 int tdb_recovery_area(struct tdb_context *tdb,
695 		      const struct tdb_methods *methods,
696 		      tdb_off_t *recovery_offset,
697 		      struct tdb_record *rec)
698 {
699 	int ret;
700 
701 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
702 		return -1;
703 	}
704 
705 	if (*recovery_offset == 0) {
706 		rec->rec_len = 0;
707 		return 0;
708 	}
709 
710 	if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
711 			      DOCONV()) == -1) {
712 		return -1;
713 	}
714 
715 	/* ignore invalid recovery regions: can happen in crash */
716 	if (rec->magic != TDB_RECOVERY_MAGIC &&
717 	    rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
718 		*recovery_offset = 0;
719 		rec->rec_len = 0;
720 	}
721 
722 	ret = methods->tdb_oob(tdb, *recovery_offset, rec->rec_len, 1);
723 	if (ret == -1) {
724 		*recovery_offset = 0;
725 		rec->rec_len = 0;
726 	}
727 
728 	return 0;
729 }
730 
731 /*
732   allocate the recovery area, or use an existing recovery area if it is
733   large enough
734 */
735 static int tdb_recovery_allocate(struct tdb_context *tdb,
736 				 tdb_len_t *recovery_size,
737 				 tdb_off_t *recovery_offset,
738 				 tdb_len_t *recovery_max_size)
739 {
740 	struct tdb_record rec;
741 	const struct tdb_methods *methods = tdb->transaction->io_methods;
742 	tdb_off_t recovery_head, new_end;
743 
744 	if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
745 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
746 		return -1;
747 	}
748 
749 	if (!tdb_recovery_size(tdb, recovery_size)) {
750 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
751 			 "overflow recovery size\n"));
752 		return -1;
753 	}
754 
755 	/* Existing recovery area? */
756 	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
757 		/* it fits in the existing area */
758 		*recovery_max_size = rec.rec_len;
759 		*recovery_offset = recovery_head;
760 		return 0;
761 	}
762 
763 	/* If recovery area in middle of file, we need a new one. */
764 	if (recovery_head == 0
765 	    || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
766 		/* we need to free up the old recovery area, then allocate a
767 		   new one at the end of the file. Note that we cannot use
768 		   tdb_allocate() to allocate the new one as that might return
769 		   us an area that is being currently used (as of the start of
770 		   the transaction) */
771 		if (recovery_head) {
772 			if (tdb_free(tdb, recovery_head, &rec) == -1) {
773 				TDB_LOG((tdb, TDB_DEBUG_FATAL,
774 					 "tdb_recovery_allocate: failed to"
775 					 " free previous recovery area\n"));
776 				return -1;
777 			}
778 
779 			/* the tdb_free() call might have increased
780 			 * the recovery size */
781 			if (!tdb_recovery_size(tdb, recovery_size)) {
782 				TDB_LOG((tdb, TDB_DEBUG_FATAL,
783 					 "tdb_recovery_allocate: "
784 					 "overflow recovery size\n"));
785 				return -1;
786 			}
787 		}
788 
789 		/* New head will be at end of file. */
790 		recovery_head = tdb->map_size;
791 	}
792 
793 	/* Now we know where it will be. */
794 	*recovery_offset = recovery_head;
795 
796 	/* Expand by more than we need, so we don't do it often. */
797 	*recovery_max_size = tdb_expand_adjust(tdb->map_size,
798 					       *recovery_size,
799 					       tdb->page_size)
800 		- sizeof(rec);
801 
802 	if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
803 	    !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
804 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
805 			 "overflow recovery area\n"));
806 		return -1;
807 	}
808 
809 	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
810 				     new_end - tdb->transaction->old_map_size)
811 	    == -1) {
812 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
813 		return -1;
814 	}
815 
816 	/* remap the file (if using mmap) */
817 	methods->tdb_oob(tdb, tdb->map_size, 1, 1);
818 
819 	/* we have to reset the old map size so that we don't try to expand the file
820 	   again in the transaction commit, which would destroy the recovery area */
821 	tdb->transaction->old_map_size = tdb->map_size;
822 
823 	/* write the recovery header offset and sync - we can sync without a race here
824 	   as the magic ptr in the recovery record has not been set */
825 	CONVERT(recovery_head);
826 	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
827 			       &recovery_head, sizeof(tdb_off_t)) == -1) {
828 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
829 		return -1;
830 	}
831 	if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
832 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
833 		return -1;
834 	}
835 
836 	return 0;
837 }
838 
839 
840 /*
841   setup the recovery data that will be used on a crash during commit
842 */
843 static int transaction_setup_recovery(struct tdb_context *tdb,
844 				      tdb_off_t *magic_offset)
845 {
846 	tdb_len_t recovery_size;
847 	unsigned char *data, *p;
848 	const struct tdb_methods *methods = tdb->transaction->io_methods;
849 	struct tdb_record *rec;
850 	tdb_off_t recovery_offset, recovery_max_size;
851 	tdb_off_t old_map_size = tdb->transaction->old_map_size;
852 	uint32_t magic, tailer;
853 	uint32_t i;
854 
855 	/*
856 	  check that the recovery area has enough space
857 	*/
858 	if (tdb_recovery_allocate(tdb, &recovery_size,
859 				  &recovery_offset, &recovery_max_size) == -1) {
860 		return -1;
861 	}
862 
863 	rec = malloc(recovery_size + sizeof(*rec));
864 	if (rec == NULL) {
865 		tdb->ecode = TDB_ERR_OOM;
866 		return -1;
867 	}
868 
869 	memset(rec, 0, sizeof(*rec));
870 
871 	rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
872 	rec->data_len = recovery_size;
873 	rec->rec_len  = recovery_max_size;
874 	rec->key_len  = old_map_size;
875 	CONVERT(*rec);
876 
877 	data = (unsigned char *)rec;
878 
879 	/* build the recovery data into a single blob to allow us to do a single
880 	   large write, which should be more efficient */
881 	p = data + sizeof(*rec);
882 	for (i=0;i<tdb->transaction->num_blocks;i++) {
883 		tdb_off_t offset;
884 		tdb_len_t length;
885 
886 		if (tdb->transaction->blocks[i] == NULL) {
887 			continue;
888 		}
889 
890 		offset = i * tdb->transaction->block_size;
891 		length = tdb->transaction->block_size;
892 		if (i == tdb->transaction->num_blocks-1) {
893 			length = tdb->transaction->last_block_size;
894 		}
895 
896 		if (offset >= old_map_size) {
897 			continue;
898 		}
899 		if (offset + length > tdb->transaction->old_map_size) {
900 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
901 			free(data);
902 			tdb->ecode = TDB_ERR_CORRUPT;
903 			return -1;
904 		}
905 		memcpy(p, &offset, 4);
906 		memcpy(p+4, &length, 4);
907 		if (DOCONV()) {
908 			tdb_convert(p, 8);
909 		}
910 		/* the recovery area contains the old data, not the
911 		   new data, so we have to call the original tdb_read
912 		   method to get it */
913 		if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
914 			free(data);
915 			tdb->ecode = TDB_ERR_IO;
916 			return -1;
917 		}
918 		p += 8 + length;
919 	}
920 
921 	/* and the tailer */
922 	tailer = sizeof(*rec) + recovery_max_size;
923 	memcpy(p, &tailer, 4);
924 	if (DOCONV()) {
925 		tdb_convert(p, 4);
926 	}
927 
928 	/* write the recovery data to the recovery area */
929 	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
930 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
931 		free(data);
932 		tdb->ecode = TDB_ERR_IO;
933 		return -1;
934 	}
935 	if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
936 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
937 		free(data);
938 		tdb->ecode = TDB_ERR_IO;
939 		return -1;
940 	}
941 
942 	/* as we don't have ordered writes, we have to sync the recovery
943 	   data before we update the magic to indicate that the recovery
944 	   data is present */
945 	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
946 		free(data);
947 		return -1;
948 	}
949 
950 	free(data);
951 
952 	magic = TDB_RECOVERY_MAGIC;
953 	CONVERT(magic);
954 
955 	*magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
956 
957 	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
958 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
959 		tdb->ecode = TDB_ERR_IO;
960 		return -1;
961 	}
962 	if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
963 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
964 		tdb->ecode = TDB_ERR_IO;
965 		return -1;
966 	}
967 
968 	/* ensure the recovery magic marker is on disk */
969 	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
970 		return -1;
971 	}
972 
973 	return 0;
974 }
975 
976 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
977 {
978 	const struct tdb_methods *methods;
979 
980 	if (tdb->transaction == NULL) {
981 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
982 		return -1;
983 	}
984 
985 	if (tdb->transaction->prepared) {
986 		tdb->ecode = TDB_ERR_EINVAL;
987 		_tdb_transaction_cancel(tdb);
988 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
989 		return -1;
990 	}
991 
992 	if (tdb->transaction->transaction_error) {
993 		tdb->ecode = TDB_ERR_IO;
994 		_tdb_transaction_cancel(tdb);
995 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
996 		return -1;
997 	}
998 
999 
1000 	if (tdb->transaction->nesting != 0) {
1001 		return 0;
1002 	}
1003 
1004 	/* check for a null transaction */
1005 	if (tdb->transaction->blocks == NULL) {
1006 		return 0;
1007 	}
1008 
1009 	methods = tdb->transaction->io_methods;
1010 
1011 	/* if there are any locks pending then the caller has not
1012 	   nested their locks properly, so fail the transaction */
1013 	if (tdb_have_extra_locks(tdb)) {
1014 		tdb->ecode = TDB_ERR_LOCK;
1015 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
1016 		_tdb_transaction_cancel(tdb);
1017 		return -1;
1018 	}
1019 
1020 	/* upgrade the main transaction lock region to a write lock */
1021 	if (tdb_allrecord_upgrade(tdb) == -1) {
1022 		if (tdb->ecode == TDB_ERR_RDONLY && tdb->read_only) {
1023 			TDB_LOG((tdb, TDB_DEBUG_ERROR,
1024 				 "tdb_transaction_prepare_commit: "
1025 				 "failed to upgrade hash locks: "
1026 				 "database is read only\n"));
1027 		} else if (tdb->ecode == TDB_ERR_RDONLY
1028 			   && tdb->traverse_read) {
1029 			TDB_LOG((tdb, TDB_DEBUG_ERROR,
1030 				 "tdb_transaction_prepare_commit: "
1031 				 "failed to upgrade hash locks: "
1032 				 "a database traverse is in progress\n"));
1033 		} else {
1034 			TDB_LOG((tdb, TDB_DEBUG_ERROR,
1035 				 "tdb_transaction_prepare_commit: "
1036 				 "failed to upgrade hash locks: %s\n",
1037 				 tdb_errorstr(tdb)));
1038 		}
1039 		_tdb_transaction_cancel(tdb);
1040 		return -1;
1041 	}
1042 
1043 	/* get the open lock - this prevents new users attaching to the database
1044 	   during the commit */
1045 	if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
1046 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
1047 		_tdb_transaction_cancel(tdb);
1048 		return -1;
1049 	}
1050 
1051 	/* write the recovery data to the end of the file */
1052 	if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1053 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1054 		_tdb_transaction_cancel(tdb);
1055 		return -1;
1056 	}
1057 
1058 	tdb->transaction->prepared = true;
1059 
1060 	/* expand the file to the new size if needed */
1061 	if (tdb->map_size != tdb->transaction->old_map_size) {
1062 		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1063 					     tdb->map_size -
1064 					     tdb->transaction->old_map_size) == -1) {
1065 			tdb->ecode = TDB_ERR_IO;
1066 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1067 			_tdb_transaction_cancel(tdb);
1068 			return -1;
1069 		}
1070 		tdb->map_size = tdb->transaction->old_map_size;
1071 		methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1072 	}
1073 
1074 	/* Keep the open lock until the actual commit */
1075 
1076 	return 0;
1077 }
1078 
1079 /*
1080    prepare to commit the current transaction
1081 */
1082 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1083 {
1084 	tdb_trace(tdb, "tdb_transaction_prepare_commit");
1085 	return _tdb_transaction_prepare_commit(tdb);
1086 }
1087 
1088 /* A repack is worthwhile if the largest is less than half total free. */
1089 static bool repack_worthwhile(struct tdb_context *tdb)
1090 {
1091 	tdb_off_t ptr;
1092 	struct tdb_record rec;
1093 	tdb_len_t total = 0, largest = 0;
1094 
1095 	if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1096 		return false;
1097 	}
1098 
1099 	while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1100 		total += rec.rec_len;
1101 		if (rec.rec_len > largest) {
1102 			largest = rec.rec_len;
1103 		}
1104 		ptr = rec.next;
1105 	}
1106 
1107 	return total > largest * 2;
1108 }
1109 
1110 /*
1111   commit the current transaction
1112 */
1113 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1114 {
1115 	const struct tdb_methods *methods;
1116 	uint32_t i;
1117 	bool need_repack = false;
1118 
1119 	if (tdb->transaction == NULL) {
1120 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1121 		return -1;
1122 	}
1123 
1124 	tdb_trace(tdb, "tdb_transaction_commit");
1125 
1126 	if (tdb->transaction->transaction_error) {
1127 		tdb->ecode = TDB_ERR_IO;
1128 		_tdb_transaction_cancel(tdb);
1129 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1130 		return -1;
1131 	}
1132 
1133 
1134 	if (tdb->transaction->nesting != 0) {
1135 		tdb->transaction->nesting--;
1136 		return 0;
1137 	}
1138 
1139 	/* check for a null transaction */
1140 	if (tdb->transaction->blocks == NULL) {
1141 		_tdb_transaction_cancel(tdb);
1142 		return 0;
1143 	}
1144 
1145 	if (!tdb->transaction->prepared) {
1146 		int ret = _tdb_transaction_prepare_commit(tdb);
1147 		if (ret)
1148 			return ret;
1149 	}
1150 
1151 	methods = tdb->transaction->io_methods;
1152 
1153 	/* perform all the writes */
1154 	for (i=0;i<tdb->transaction->num_blocks;i++) {
1155 		tdb_off_t offset;
1156 		tdb_len_t length;
1157 
1158 		if (tdb->transaction->blocks[i] == NULL) {
1159 			continue;
1160 		}
1161 
1162 		offset = i * tdb->transaction->block_size;
1163 		length = tdb->transaction->block_size;
1164 		if (i == tdb->transaction->num_blocks-1) {
1165 			length = tdb->transaction->last_block_size;
1166 		}
1167 
1168 		if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1169 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1170 
1171 			/* we've overwritten part of the data and
1172 			   possibly expanded the file, so we need to
1173 			   run the crash recovery code */
1174 			tdb->methods = methods;
1175 			tdb_transaction_recover(tdb);
1176 
1177 			_tdb_transaction_cancel(tdb);
1178 
1179 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1180 			return -1;
1181 		}
1182 		SAFE_FREE(tdb->transaction->blocks[i]);
1183 	}
1184 
1185 	/* Do this before we drop lock or blocks. */
1186 	if (tdb->transaction->expanded) {
1187 		need_repack = repack_worthwhile(tdb);
1188 	}
1189 
1190 	SAFE_FREE(tdb->transaction->blocks);
1191 	tdb->transaction->num_blocks = 0;
1192 
1193 	/* ensure the new data is on disk */
1194 	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1195 		return -1;
1196 	}
1197 
1198 	/*
1199 	  TODO: maybe write to some dummy hdr field, or write to magic
1200 	  offset without mmap, before the last sync, instead of the
1201 	  utime() call
1202 	*/
1203 
1204 	/* on some systems (like Linux 2.6.x) changes via mmap/msync
1205 	   don't change the mtime of the file, this means the file may
1206 	   not be backed up (as tdb rounding to block sizes means that
1207 	   file size changes are quite rare too). The following forces
1208 	   mtime changes when a transaction completes */
1209 #ifdef HAVE_UTIME
1210 	utime(tdb->name, NULL);
1211 #endif
1212 
1213 	/* use a transaction cancel to free memory and remove the
1214 	   transaction locks */
1215 	_tdb_transaction_cancel(tdb);
1216 
1217 	if (need_repack) {
1218 		int ret = tdb_repack(tdb);
1219 		if (ret != 0) {
1220 			TDB_LOG((tdb, TDB_DEBUG_FATAL,
1221 				 __location__ " Failed to repack database (not fatal)\n"));
1222 		}
1223 		/*
1224 		 * Ignore the error.
1225 		 *
1226 		 * Why?
1227 		 *
1228 		 * We just committed to the DB above, so anything
1229 		 * written during the transaction is committed, the
1230 		 * caller needs to know that the long-term state was
1231 		 * successfully modified.
1232 		 *
1233 		 * tdb_repack is an optimization that can fail for
1234 		 * reasons like lock ordering and we cannot recover
1235 		 * the transaction lock at this point, having released
1236 		 * it above.
1237 		 *
1238 		 * If we return a failure the caller thinks the
1239 		 * transaction was rolled back.
1240 		 */
1241 	}
1242 
1243 	return 0;
1244 }
1245 
1246 
1247 /*
1248   recover from an aborted transaction. Must be called with exclusive
1249   database write access already established (including the open
1250   lock to prevent new processes attaching)
1251 */
1252 int tdb_transaction_recover(struct tdb_context *tdb)
1253 {
1254 	tdb_off_t recovery_head, recovery_eof;
1255 	unsigned char *data, *p;
1256 	uint32_t zero = 0;
1257 	struct tdb_record rec;
1258 
1259 	/* find the recovery area */
1260 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1261 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1262 		tdb->ecode = TDB_ERR_IO;
1263 		return -1;
1264 	}
1265 
1266 	if (recovery_head == 0) {
1267 		/* we have never allocated a recovery record */
1268 		return 0;
1269 	}
1270 
1271 	/* read the recovery record */
1272 	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1273 				   sizeof(rec), DOCONV()) == -1) {
1274 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1275 		tdb->ecode = TDB_ERR_IO;
1276 		return -1;
1277 	}
1278 
1279 	if (rec.magic != TDB_RECOVERY_MAGIC) {
1280 		/* there is no valid recovery data */
1281 		return 0;
1282 	}
1283 
1284 	if (tdb->read_only) {
1285 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1286 		tdb->ecode = TDB_ERR_CORRUPT;
1287 		return -1;
1288 	}
1289 
1290 	recovery_eof = rec.key_len;
1291 
1292 	data = (unsigned char *)malloc(rec.data_len);
1293 	if (data == NULL) {
1294 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1295 		tdb->ecode = TDB_ERR_OOM;
1296 		return -1;
1297 	}
1298 
1299 	/* read the full recovery data */
1300 	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1301 				   rec.data_len, 0) == -1) {
1302 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1303 		tdb->ecode = TDB_ERR_IO;
1304 		return -1;
1305 	}
1306 
1307 	/* recover the file data */
1308 	p = data;
1309 	while (p+8 < data + rec.data_len) {
1310 		uint32_t ofs, len;
1311 		if (DOCONV()) {
1312 			tdb_convert(p, 8);
1313 		}
1314 		memcpy(&ofs, p, 4);
1315 		memcpy(&len, p+4, 4);
1316 
1317 		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1318 			free(data);
1319 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1320 			tdb->ecode = TDB_ERR_IO;
1321 			return -1;
1322 		}
1323 		p += 8 + len;
1324 	}
1325 
1326 	free(data);
1327 
1328 	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1329 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1330 		tdb->ecode = TDB_ERR_IO;
1331 		return -1;
1332 	}
1333 
1334 	/* if the recovery area is after the recovered eof then remove it */
1335 	if (recovery_eof <= recovery_head) {
1336 		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1337 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1338 			tdb->ecode = TDB_ERR_IO;
1339 			return -1;
1340 		}
1341 	}
1342 
1343 	/* remove the recovery magic */
1344 	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1345 			  &zero) == -1) {
1346 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1347 		tdb->ecode = TDB_ERR_IO;
1348 		return -1;
1349 	}
1350 
1351 	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1352 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1353 		tdb->ecode = TDB_ERR_IO;
1354 		return -1;
1355 	}
1356 
1357 	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1358 		 recovery_eof));
1359 
1360 	/* all done */
1361 	return 0;
1362 }
1363 
1364 /* Any I/O failures we say "needs recovery". */
1365 bool tdb_needs_recovery(struct tdb_context *tdb)
1366 {
1367 	tdb_off_t recovery_head;
1368 	struct tdb_record rec;
1369 
1370 	/* find the recovery area */
1371 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1372 		return true;
1373 	}
1374 
1375 	if (recovery_head == 0) {
1376 		/* we have never allocated a recovery record */
1377 		return false;
1378 	}
1379 
1380 	/* read the recovery record */
1381 	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1382 				   sizeof(rec), DOCONV()) == -1) {
1383 		return true;
1384 	}
1385 
1386 	return (rec.magic == TDB_RECOVERY_MAGIC);
1387 }
1388