1  /*
2    Unix SMB/CIFS implementation.
3 
4    trivial database library
5 
6    Copyright (C) Andrew Tridgell              2005
7 
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11 
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16 
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21 
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26 
27 #include "tdb_private.h"
28 
29 /*
30   transaction design:
31 
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36 
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41 
42   - dymacially allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46 
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52 
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56 
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59 
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63 
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67 
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71 
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78 
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84 
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88 
89 */
90 
91 
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96 	/* we keep a mirrored copy of the tdb hash heads here so
97 	   tdb_next_hash_chain() can operate efficiently */
98 	u32 *hash_heads;
99 
100 	/* the original io methods - used to do IOs to the real db */
101 	const struct tdb_methods *io_methods;
102 
103 	/* the list of transaction elements. We use a doubly linked
104 	   list with a last pointer to allow us to keep the list
105 	   ordered, with first element at the front of the list. It
106 	   needs to be doubly linked as the read/write traversals need
107 	   to be backwards, while the commit needs to be forwards */
108 	struct tdb_transaction_el {
109 		struct tdb_transaction_el *next, *prev;
110 		tdb_off_t offset;
111 		tdb_len_t length;
112 		unsigned char *data;
113 	} *elements, *elements_last;
114 
115 	/* non-zero when an internal transaction error has
116 	   occurred. All write operations will then fail until the
117 	   transaction is ended */
118 	int transaction_error;
119 
120 	/* when inside a transaction we need to keep track of any
121 	   nested tdb_transaction_start() calls, as these are allowed,
122 	   but don't create a new transaction */
123 	int nesting;
124 
125 	/* old file size before transaction */
126 	tdb_len_t old_map_size;
127 };
128 
129 
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
transaction_read(struct tdb_context * tdb,tdb_off_t off,void * buf,tdb_len_t len,int cv)134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
135 			    tdb_len_t len, int cv)
136 {
137 	struct tdb_transaction_el *el;
138 
139 	/* we need to walk the list backwards to get the most recent data */
140 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
141 		tdb_len_t partial;
142 
143 		if (off+len <= el->offset) {
144 			continue;
145 		}
146 		if (off >= el->offset + el->length) {
147 			continue;
148 		}
149 
150 		/* an overlapping read - needs to be split into up to
151 		   2 reads and a memcpy */
152 		if (off < el->offset) {
153 			partial = el->offset - off;
154 			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155 				goto fail;
156 			}
157 			len -= partial;
158 			off += partial;
159 			buf = (void *)(partial + (char *)buf);
160 		}
161 		if (off + len <= el->offset + el->length) {
162 			partial = len;
163 		} else {
164 			partial = el->offset + el->length - off;
165 		}
166 		memcpy(buf, el->data + (off - el->offset), partial);
167 		if (cv) {
168 			tdb_convert(buf, len);
169 		}
170 		len -= partial;
171 		off += partial;
172 		buf = (void *)(partial + (char *)buf);
173 
174 		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175 			goto fail;
176 		}
177 
178 		return 0;
179 	}
180 
181 	/* its not in the transaction elements - do a real read */
182 	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183 
184 fail:
185 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186 	tdb->ecode = TDB_ERR_IO;
187 	tdb->transaction->transaction_error = 1;
188 	return -1;
189 }
190 
191 
192 /*
193   write while in a transaction
194 */
transaction_write(struct tdb_context * tdb,tdb_off_t off,const void * buf,tdb_len_t len)195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
196 			     const void *buf, tdb_len_t len)
197 {
198 	struct tdb_transaction_el *el, *best_el=NULL;
199 
200 	if (len == 0) {
201 		return 0;
202 	}
203 
204 	/* if the write is to a hash head, then update the transaction
205 	   hash heads */
206 	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207 	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208 		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209 		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210 	}
211 
212 	/* first see if we can replace an existing entry */
213 	for (el=tdb->transaction->elements_last;el;el=el->prev) {
214 		tdb_len_t partial;
215 
216 		if (best_el == NULL && off == el->offset+el->length) {
217 			best_el = el;
218 		}
219 
220 		if (off+len <= el->offset) {
221 			continue;
222 		}
223 		if (off >= el->offset + el->length) {
224 			continue;
225 		}
226 
227 		/* an overlapping write - needs to be split into up to
228 		   2 writes and a memcpy */
229 		if (off < el->offset) {
230 			partial = el->offset - off;
231 			if (transaction_write(tdb, off, buf, partial) != 0) {
232 				goto fail;
233 			}
234 			len -= partial;
235 			off += partial;
236 			buf = (const void *)(partial + (const char *)buf);
237 		}
238 		if (off + len <= el->offset + el->length) {
239 			partial = len;
240 		} else {
241 			partial = el->offset + el->length - off;
242 		}
243 		memcpy(el->data + (off - el->offset), buf, partial);
244 		len -= partial;
245 		off += partial;
246 		buf = (const void *)(partial + (const char *)buf);
247 
248 		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249 			goto fail;
250 		}
251 
252 		return 0;
253 	}
254 
255 	/* see if we can append the new entry to an existing entry */
256 	if (best_el && best_el->offset + best_el->length == off &&
257 	    (off+len < tdb->transaction->old_map_size ||
258 	     off > tdb->transaction->old_map_size)) {
259 		unsigned char *data = best_el->data;
260 		el = best_el;
261 		el->data = (unsigned char *)realloc(el->data,
262 						    el->length + len);
263 		if (el->data == NULL) {
264 			tdb->ecode = TDB_ERR_OOM;
265 			tdb->transaction->transaction_error = 1;
266 			el->data = data;
267 			return -1;
268 		}
269 		if (buf) {
270 			memcpy(el->data + el->length, buf, len);
271 		} else {
272 			memset(el->data + el->length, TDB_PAD_BYTE, len);
273 		}
274 		el->length += len;
275 		return 0;
276 	}
277 
278 	/* add a new entry at the end of the list */
279 	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
280 	if (el == NULL) {
281 		tdb->ecode = TDB_ERR_OOM;
282 		tdb->transaction->transaction_error = 1;
283 		return -1;
284 	}
285 	el->next = NULL;
286 	el->prev = tdb->transaction->elements_last;
287 	el->offset = off;
288 	el->length = len;
289 	el->data = (unsigned char *)malloc(len);
290 	if (el->data == NULL) {
291 		free(el);
292 		tdb->ecode = TDB_ERR_OOM;
293 		tdb->transaction->transaction_error = 1;
294 		return -1;
295 	}
296 	if (buf) {
297 		memcpy(el->data, buf, len);
298 	} else {
299 		memset(el->data, TDB_PAD_BYTE, len);
300 	}
301 	if (el->prev) {
302 		el->prev->next = el;
303 	} else {
304 		tdb->transaction->elements = el;
305 	}
306 	tdb->transaction->elements_last = el;
307 	return 0;
308 
309 fail:
310 	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
311 	tdb->ecode = TDB_ERR_IO;
312 	tdb->transaction->transaction_error = 1;
313 	return -1;
314 }
315 
316 /*
317   accelerated hash chain head search, using the cached hash heads
318 */
transaction_next_hash_chain(struct tdb_context * tdb,u32 * chain)319 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
320 {
321 	u32 h = *chain;
322 	for (;h < tdb->header.hash_size;h++) {
323 		/* the +1 takes account of the freelist */
324 		if (0 != tdb->transaction->hash_heads[h+1]) {
325 			break;
326 		}
327 	}
328 	(*chain) = h;
329 }
330 
331 /*
332   out of bounds check during a transaction
333 */
transaction_oob(struct tdb_context * tdb,tdb_off_t len,int probe)334 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 {
336 	if (len <= tdb->map_size) {
337 		return 0;
338 	}
339 	return TDB_ERRCODE(TDB_ERR_IO, -1);
340 }
341 
342 /*
343   transaction version of tdb_expand().
344 */
transaction_expand_file(struct tdb_context * tdb,tdb_off_t size,tdb_off_t addition)345 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
346 				   tdb_off_t addition)
347 {
348 	/* add a write to the transaction elements, so subsequent
349 	   reads see the zero data */
350 	if (transaction_write(tdb, size, NULL, addition) != 0) {
351 		return -1;
352 	}
353 
354 	return 0;
355 }
356 
357 /*
358   brlock during a transaction - ignore them
359 */
transaction_brlock(struct tdb_context * tdb,tdb_off_t offset,int rw_type,int lck_type,int probe,size_t len)360 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
361 		       int rw_type, int lck_type, int probe, size_t len)
362 {
363 	return 0;
364 }
365 
366 static const struct tdb_methods transaction_methods = {
367 	transaction_read,
368 	transaction_write,
369 	transaction_next_hash_chain,
370 	transaction_oob,
371 	transaction_expand_file,
372 	transaction_brlock
373 };
374 
375 
376 /*
377   start a tdb transaction. No token is returned, as only a single
378   transaction is allowed to be pending per tdb_context
379 */
tdb_transaction_start(struct tdb_context * tdb)380 int tdb_transaction_start(struct tdb_context *tdb)
381 {
382 	/* some sanity checks */
383 	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
384 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
385 		tdb->ecode = TDB_ERR_EINVAL;
386 		return -1;
387 	}
388 
389 	/* cope with nested tdb_transaction_start() calls */
390 	if (tdb->transaction != NULL) {
391 		tdb->transaction->nesting++;
392 		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
393 			 tdb->transaction->nesting));
394 		return 0;
395 	}
396 
397 	if (tdb->num_locks != 0 || tdb->global_lock.count) {
398 		/* the caller must not have any locks when starting a
399 		   transaction as otherwise we'll be screwed by lack
400 		   of nested locks in posix */
401 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
402 		tdb->ecode = TDB_ERR_LOCK;
403 		return -1;
404 	}
405 
406 	if (tdb->travlocks.next != NULL) {
407 		/* you cannot use transactions inside a traverse (although you can use
408 		   traverse inside a transaction) as otherwise you can end up with
409 		   deadlock */
410 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
411 		tdb->ecode = TDB_ERR_LOCK;
412 		return -1;
413 	}
414 
415 	tdb->transaction = (struct tdb_transaction *)
416 		calloc(sizeof(struct tdb_transaction), 1);
417 	if (tdb->transaction == NULL) {
418 		tdb->ecode = TDB_ERR_OOM;
419 		return -1;
420 	}
421 
422 	/* get the transaction write lock. This is a blocking lock. As
423 	   discussed with Volker, there are a number of ways we could
424 	   make this async, which we will probably do in the future */
425 	if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
426 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
427 		tdb->ecode = TDB_ERR_LOCK;
428 		SAFE_FREE(tdb->transaction);
429 		return -1;
430 	}
431 
432 	/* get a read lock from the freelist to the end of file. This
433 	   is upgraded to a write lock during the commit */
434 	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
435 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
436 		tdb->ecode = TDB_ERR_LOCK;
437 		goto fail;
438 	}
439 
440 	/* setup a copy of the hash table heads so the hash scan in
441 	   traverse can be fast */
442 	tdb->transaction->hash_heads = (u32 *)
443 		calloc(tdb->header.hash_size+1, sizeof(u32));
444 	if (tdb->transaction->hash_heads == NULL) {
445 		tdb->ecode = TDB_ERR_OOM;
446 		goto fail;
447 	}
448 	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
449 				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
450 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
451 		tdb->ecode = TDB_ERR_IO;
452 		goto fail;
453 	}
454 
455 	/* make sure we know about any file expansions already done by
456 	   anyone else */
457 	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
458 	tdb->transaction->old_map_size = tdb->map_size;
459 
460 	/* finally hook the io methods, replacing them with
461 	   transaction specific methods */
462 	tdb->transaction->io_methods = tdb->methods;
463 	tdb->methods = &transaction_methods;
464 
465 	/* by calling this transaction write here, we ensure that we don't grow the
466 	   transaction linked list due to hash table updates */
467 	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
468 			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
469 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
470 		tdb->ecode = TDB_ERR_IO;
471 		goto fail;
472 	}
473 
474 	return 0;
475 
476 fail:
477 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478 	tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
479 	SAFE_FREE(tdb->transaction->hash_heads);
480 	SAFE_FREE(tdb->transaction);
481 	return -1;
482 }
483 
484 
485 /*
486   cancel the current transaction
487 */
tdb_transaction_cancel(struct tdb_context * tdb)488 int tdb_transaction_cancel(struct tdb_context *tdb)
489 {
490 	if (tdb->transaction == NULL) {
491 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492 		return -1;
493 	}
494 
495 	if (tdb->transaction->nesting != 0) {
496 		tdb->transaction->transaction_error = 1;
497 		tdb->transaction->nesting--;
498 		return 0;
499 	}
500 
501 	tdb->map_size = tdb->transaction->old_map_size;
502 
503 	/* free all the transaction elements */
504 	while (tdb->transaction->elements) {
505 		struct tdb_transaction_el *el = tdb->transaction->elements;
506 		tdb->transaction->elements = el->next;
507 		free(el->data);
508 		free(el);
509 	}
510 
511 	/* remove any global lock created during the transaction */
512 	if (tdb->global_lock.count != 0) {
513 		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514 		tdb->global_lock.count = 0;
515 	}
516 
517 	/* remove any locks created during the transaction */
518 	if (tdb->num_locks != 0) {
519 		int h;
520 		for (h=0;h<tdb->header.hash_size+1;h++) {
521 			if (tdb->locked[h].count != 0) {
522 				tdb_brlock(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
523 				tdb->locked[h].count = 0;
524 			}
525 		}
526 		tdb->num_locks = 0;
527 	}
528 
529 	/* restore the normal io methods */
530 	tdb->methods = tdb->transaction->io_methods;
531 
532 	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
533 	tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
534 	SAFE_FREE(tdb->transaction->hash_heads);
535 	SAFE_FREE(tdb->transaction);
536 
537 	return 0;
538 }
539 
540 /*
541   sync to disk
542 */
transaction_sync(struct tdb_context * tdb,tdb_off_t offset,tdb_len_t length)543 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
544 {
545 	if (fsync(tdb->fd) != 0) {
546 		tdb->ecode = TDB_ERR_IO;
547 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
548 		return -1;
549 	}
550 #ifdef MS_SYNC
551 	if (tdb->map_ptr) {
552 		tdb_off_t moffset = offset & ~(tdb->page_size-1);
553 		if (msync(moffset + (char *)tdb->map_ptr,
554 			  length + (offset - moffset), MS_SYNC) != 0) {
555 			tdb->ecode = TDB_ERR_IO;
556 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
557 				 strerror(errno)));
558 			return -1;
559 		}
560 	}
561 #endif
562 	return 0;
563 }
564 
565 
566 /*
567   work out how much space the linearised recovery data will consume
568 */
tdb_recovery_size(struct tdb_context * tdb)569 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
570 {
571 	struct tdb_transaction_el *el;
572 	tdb_len_t recovery_size = 0;
573 
574 	recovery_size = sizeof(u32);
575 	for (el=tdb->transaction->elements;el;el=el->next) {
576 		if (el->offset >= tdb->transaction->old_map_size) {
577 			continue;
578 		}
579 		recovery_size += 2*sizeof(tdb_off_t) + el->length;
580 	}
581 
582 	return recovery_size;
583 }
584 
585 /*
586   allocate the recovery area, or use an existing recovery area if it is
587   large enough
588 */
tdb_recovery_allocate(struct tdb_context * tdb,tdb_len_t * recovery_size,tdb_off_t * recovery_offset,tdb_len_t * recovery_max_size)589 static int tdb_recovery_allocate(struct tdb_context *tdb,
590 				 tdb_len_t *recovery_size,
591 				 tdb_off_t *recovery_offset,
592 				 tdb_len_t *recovery_max_size)
593 {
594 	struct list_struct rec;
595 	const struct tdb_methods *methods = tdb->transaction->io_methods;
596 	tdb_off_t recovery_head;
597 
598 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
599 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
600 		return -1;
601 	}
602 
603 	rec.rec_len = 0;
604 
605 	if (recovery_head != 0 &&
606 	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
607 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
608 		return -1;
609 	}
610 
611 	*recovery_size = tdb_recovery_size(tdb);
612 
613 	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
614 		/* it fits in the existing area */
615 		*recovery_max_size = rec.rec_len;
616 		*recovery_offset = recovery_head;
617 		return 0;
618 	}
619 
620 	/* we need to free up the old recovery area, then allocate a
621 	   new one at the end of the file. Note that we cannot use
622 	   tdb_allocate() to allocate the new one as that might return
623 	   us an area that is being currently used (as of the start of
624 	   the transaction) */
625 	if (recovery_head != 0) {
626 		if (tdb_free(tdb, recovery_head, &rec) == -1) {
627 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
628 			return -1;
629 		}
630 	}
631 
632 	/* the tdb_free() call might have increased the recovery size */
633 	*recovery_size = tdb_recovery_size(tdb);
634 
635 	/* round up to a multiple of page size */
636 	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
637 	*recovery_offset = tdb->map_size;
638 	recovery_head = *recovery_offset;
639 
640 	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
641 				     (tdb->map_size - tdb->transaction->old_map_size) +
642 				     sizeof(rec) + *recovery_max_size) == -1) {
643 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
644 		return -1;
645 	}
646 
647 	/* remap the file (if using mmap) */
648 	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
649 
650 	/* we have to reset the old map size so that we don't try to expand the file
651 	   again in the transaction commit, which would destroy the recovery area */
652 	tdb->transaction->old_map_size = tdb->map_size;
653 
654 	/* write the recovery header offset and sync - we can sync without a race here
655 	   as the magic ptr in the recovery record has not been set */
656 	CONVERT(recovery_head);
657 	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
658 			       &recovery_head, sizeof(tdb_off_t)) == -1) {
659 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
660 		return -1;
661 	}
662 
663 	return 0;
664 }
665 
666 
667 /*
668   setup the recovery data that will be used on a crash during commit
669 */
transaction_setup_recovery(struct tdb_context * tdb,tdb_off_t * magic_offset)670 static int transaction_setup_recovery(struct tdb_context *tdb,
671 				      tdb_off_t *magic_offset)
672 {
673 	struct tdb_transaction_el *el;
674 	tdb_len_t recovery_size;
675 	unsigned char *data, *p;
676 	const struct tdb_methods *methods = tdb->transaction->io_methods;
677 	struct list_struct *rec;
678 	tdb_off_t recovery_offset, recovery_max_size;
679 	tdb_off_t old_map_size = tdb->transaction->old_map_size;
680 	u32 magic, tailer;
681 
682 	/*
683 	  check that the recovery area has enough space
684 	*/
685 	if (tdb_recovery_allocate(tdb, &recovery_size,
686 				  &recovery_offset, &recovery_max_size) == -1) {
687 		return -1;
688 	}
689 
690 	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
691 	if (data == NULL) {
692 		tdb->ecode = TDB_ERR_OOM;
693 		return -1;
694 	}
695 
696 	rec = (struct list_struct *)data;
697 	memset(rec, 0, sizeof(*rec));
698 
699 	rec->magic    = 0;
700 	rec->data_len = recovery_size;
701 	rec->rec_len  = recovery_max_size;
702 	rec->key_len  = old_map_size;
703 	CONVERT(rec);
704 
705 	/* build the recovery data into a single blob to allow us to do a single
706 	   large write, which should be more efficient */
707 	p = data + sizeof(*rec);
708 	for (el=tdb->transaction->elements;el;el=el->next) {
709 		if (el->offset >= old_map_size) {
710 			continue;
711 		}
712 		if (el->offset + el->length > tdb->transaction->old_map_size) {
713 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
714 			free(data);
715 			tdb->ecode = TDB_ERR_CORRUPT;
716 			return -1;
717 		}
718 		memcpy(p, &el->offset, 4);
719 		memcpy(p+4, &el->length, 4);
720 		if (DOCONV()) {
721 			tdb_convert(p, 8);
722 		}
723 		/* the recovery area contains the old data, not the
724 		   new data, so we have to call the original tdb_read
725 		   method to get it */
726 		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
727 			free(data);
728 			tdb->ecode = TDB_ERR_IO;
729 			return -1;
730 		}
731 		p += 8 + el->length;
732 	}
733 
734 	/* and the tailer */
735 	tailer = sizeof(*rec) + recovery_max_size;
736 	memcpy(p, &tailer, 4);
737 	CONVERT(p);
738 
739 	/* write the recovery data to the recovery area */
740 	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
741 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
742 		free(data);
743 		tdb->ecode = TDB_ERR_IO;
744 		return -1;
745 	}
746 
747 	/* as we don't have ordered writes, we have to sync the recovery
748 	   data before we update the magic to indicate that the recovery
749 	   data is present */
750 	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
751 		free(data);
752 		return -1;
753 	}
754 
755 	free(data);
756 
757 	magic = TDB_RECOVERY_MAGIC;
758 	CONVERT(magic);
759 
760 	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
761 
762 	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
763 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
764 		tdb->ecode = TDB_ERR_IO;
765 		return -1;
766 	}
767 
768 	/* ensure the recovery magic marker is on disk */
769 	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
770 		return -1;
771 	}
772 
773 	return 0;
774 }
775 
776 /*
777   commit the current transaction
778 */
tdb_transaction_commit(struct tdb_context * tdb)779 int tdb_transaction_commit(struct tdb_context *tdb)
780 {
781 	const struct tdb_methods *methods;
782 	tdb_off_t magic_offset = 0;
783 	u32 zero = 0;
784 
785 	if (tdb->transaction == NULL) {
786 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
787 		return -1;
788 	}
789 
790 	if (tdb->transaction->transaction_error) {
791 		tdb->ecode = TDB_ERR_IO;
792 		tdb_transaction_cancel(tdb);
793 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
794 		return -1;
795 	}
796 
797 	if (tdb->transaction->nesting != 0) {
798 		tdb->transaction->nesting--;
799 		return 0;
800 	}
801 
802 	/* check for a null transaction */
803 	if (tdb->transaction->elements == NULL) {
804 		tdb_transaction_cancel(tdb);
805 		return 0;
806 	}
807 
808 	methods = tdb->transaction->io_methods;
809 
810 	/* if there are any locks pending then the caller has not
811 	   nested their locks properly, so fail the transaction */
812 	if (tdb->num_locks || tdb->global_lock.count) {
813 		tdb->ecode = TDB_ERR_LOCK;
814 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
815 		tdb_transaction_cancel(tdb);
816 		return -1;
817 	}
818 
819 	/* upgrade the main transaction lock region to a write lock */
820 	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
821 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
822 		tdb->ecode = TDB_ERR_LOCK;
823 		tdb_transaction_cancel(tdb);
824 		return -1;
825 	}
826 
827 	/* get the global lock - this prevents new users attaching to the database
828 	   during the commit */
829 	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
830 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
831 		tdb->ecode = TDB_ERR_LOCK;
832 		tdb_transaction_cancel(tdb);
833 		return -1;
834 	}
835 
836 	if (!(tdb->flags & TDB_NOSYNC)) {
837 		/* write the recovery data to the end of the file */
838 		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
839 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
840 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
841 			tdb_transaction_cancel(tdb);
842 			return -1;
843 		}
844 	}
845 
846 	/* expand the file to the new size if needed */
847 	if (tdb->map_size != tdb->transaction->old_map_size) {
848 		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
849 					     tdb->map_size -
850 					     tdb->transaction->old_map_size) == -1) {
851 			tdb->ecode = TDB_ERR_IO;
852 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
853 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
854 			tdb_transaction_cancel(tdb);
855 			return -1;
856 		}
857 		tdb->map_size = tdb->transaction->old_map_size;
858 		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
859 	}
860 
861 	/* perform all the writes */
862 	while (tdb->transaction->elements) {
863 		struct tdb_transaction_el *el = tdb->transaction->elements;
864 
865 		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
866 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
867 
868 			/* we've overwritten part of the data and
869 			   possibly expanded the file, so we need to
870 			   run the crash recovery code */
871 			tdb->methods = methods;
872 			tdb_transaction_recover(tdb);
873 
874 			tdb_transaction_cancel(tdb);
875 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
876 
877 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
878 			return -1;
879 		}
880 		tdb->transaction->elements = el->next;
881 		free(el->data);
882 		free(el);
883 	}
884 
885 	if (!(tdb->flags & TDB_NOSYNC)) {
886 		/* ensure the new data is on disk */
887 		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
888 			return -1;
889 		}
890 
891 		/* remove the recovery marker */
892 		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
893 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
894 			return -1;
895 		}
896 
897 		/* ensure the recovery marker has been removed on disk */
898 		if (transaction_sync(tdb, magic_offset, 4) == -1) {
899 			return -1;
900 		}
901 	}
902 
903 	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
904 
905 	/*
906 	  TODO: maybe write to some dummy hdr field, or write to magic
907 	  offset without mmap, before the last sync, instead of the
908 	  utime() call
909 	*/
910 
911 	/* on some systems (like Linux 2.6.x) changes via mmap/msync
912 	   don't change the mtime of the file, this means the file may
913 	   not be backed up (as tdb rounding to block sizes means that
914 	   file size changes are quite rare too). The following forces
915 	   mtime changes when a transaction completes */
916 #ifdef HAVE_UTIME
917 	utime(tdb->name, NULL);
918 #endif
919 
920 	/* use a transaction cancel to free memory and remove the
921 	   transaction locks */
922 	tdb_transaction_cancel(tdb);
923 	return 0;
924 }
925 
926 
927 /*
928   recover from an aborted transaction. Must be called with exclusive
929   database write access already established (including the global
930   lock to prevent new processes attaching)
931 */
tdb_transaction_recover(struct tdb_context * tdb)932 int tdb_transaction_recover(struct tdb_context *tdb)
933 {
934 	tdb_off_t recovery_head, recovery_eof;
935 	unsigned char *data, *p;
936 	u32 zero = 0;
937 	struct list_struct rec;
938 
939 	/* find the recovery area */
940 	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
941 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
942 		tdb->ecode = TDB_ERR_IO;
943 		return -1;
944 	}
945 
946 	if (recovery_head == 0) {
947 		/* we have never allocated a recovery record */
948 		return 0;
949 	}
950 
951 	/* read the recovery record */
952 	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
953 				   sizeof(rec), DOCONV()) == -1) {
954 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
955 		tdb->ecode = TDB_ERR_IO;
956 		return -1;
957 	}
958 
959 	if (rec.magic != TDB_RECOVERY_MAGIC) {
960 		/* there is no valid recovery data */
961 		return 0;
962 	}
963 
964 	if (tdb->read_only) {
965 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
966 		tdb->ecode = TDB_ERR_CORRUPT;
967 		return -1;
968 	}
969 
970 	recovery_eof = rec.key_len;
971 
972 	data = (unsigned char *)malloc(rec.data_len);
973 	if (data == NULL) {
974 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
975 		tdb->ecode = TDB_ERR_OOM;
976 		return -1;
977 	}
978 
979 	/* read the full recovery data */
980 	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
981 				   rec.data_len, 0) == -1) {
982 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
983 		tdb->ecode = TDB_ERR_IO;
984 		return -1;
985 	}
986 
987 	/* recover the file data */
988 	p = data;
989 	while (p+8 < data + rec.data_len) {
990 		u32 ofs, len;
991 		if (DOCONV()) {
992 			tdb_convert(p, 8);
993 		}
994 		memcpy(&ofs, p, 4);
995 		memcpy(&len, p+4, 4);
996 
997 		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
998 			free(data);
999 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1000 			tdb->ecode = TDB_ERR_IO;
1001 			return -1;
1002 		}
1003 		p += 8 + len;
1004 	}
1005 
1006 	free(data);
1007 
1008 	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1009 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1010 		tdb->ecode = TDB_ERR_IO;
1011 		return -1;
1012 	}
1013 
1014 	/* if the recovery area is after the recovered eof then remove it */
1015 	if (recovery_eof <= recovery_head) {
1016 		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1017 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1018 			tdb->ecode = TDB_ERR_IO;
1019 			return -1;
1020 		}
1021 	}
1022 
1023 	/* remove the recovery magic */
1024 	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1025 			  &zero) == -1) {
1026 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1027 		tdb->ecode = TDB_ERR_IO;
1028 		return -1;
1029 	}
1030 
1031 	/* reduce the file size to the old size */
1032 	tdb_munmap(tdb);
1033 	if (ftruncate(tdb->fd, recovery_eof) != 0) {
1034 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1035 		tdb->ecode = TDB_ERR_IO;
1036 		return -1;
1037 	}
1038 	tdb->map_size = recovery_eof;
1039 	tdb_mmap(tdb);
1040 
1041 	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1042 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1043 		tdb->ecode = TDB_ERR_IO;
1044 		return -1;
1045 	}
1046 
1047 	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1048 		 recovery_eof));
1049 
1050 	/* all done */
1051 	return 0;
1052 }
1053