1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 2005
7
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
11
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
16
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
21
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30 transaction design:
31
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
36
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
41
42 - dymacially allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
46
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
51 to the real database.
52
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
56
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
59
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
63
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
67
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
71
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
78
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
83 intervention.
84
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
88
89 */
90
91
92 /*
93 hold the context of any current transaction
94 */
95 struct tdb_transaction {
96 /* we keep a mirrored copy of the tdb hash heads here so
97 tdb_next_hash_chain() can operate efficiently */
98 u32 *hash_heads;
99
100 /* the original io methods - used to do IOs to the real db */
101 const struct tdb_methods *io_methods;
102
103 /* the list of transaction elements. We use a doubly linked
104 list with a last pointer to allow us to keep the list
105 ordered, with first element at the front of the list. It
106 needs to be doubly linked as the read/write traversals need
107 to be backwards, while the commit needs to be forwards */
108 struct tdb_transaction_el {
109 struct tdb_transaction_el *next, *prev;
110 tdb_off_t offset;
111 tdb_len_t length;
112 unsigned char *data;
113 } *elements, *elements_last;
114
115 /* non-zero when an internal transaction error has
116 occurred. All write operations will then fail until the
117 transaction is ended */
118 int transaction_error;
119
120 /* when inside a transaction we need to keep track of any
121 nested tdb_transaction_start() calls, as these are allowed,
122 but don't create a new transaction */
123 int nesting;
124
125 /* old file size before transaction */
126 tdb_len_t old_map_size;
127 };
128
129
130 /*
131 read while in a transaction. We need to check first if the data is in our list
132 of transaction elements, then if not do a real read
133 */
transaction_read(struct tdb_context * tdb,tdb_off_t off,void * buf,tdb_len_t len,int cv)134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
135 tdb_len_t len, int cv)
136 {
137 struct tdb_transaction_el *el;
138
139 /* we need to walk the list backwards to get the most recent data */
140 for (el=tdb->transaction->elements_last;el;el=el->prev) {
141 tdb_len_t partial;
142
143 if (off+len <= el->offset) {
144 continue;
145 }
146 if (off >= el->offset + el->length) {
147 continue;
148 }
149
150 /* an overlapping read - needs to be split into up to
151 2 reads and a memcpy */
152 if (off < el->offset) {
153 partial = el->offset - off;
154 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155 goto fail;
156 }
157 len -= partial;
158 off += partial;
159 buf = (void *)(partial + (char *)buf);
160 }
161 if (off + len <= el->offset + el->length) {
162 partial = len;
163 } else {
164 partial = el->offset + el->length - off;
165 }
166 memcpy(buf, el->data + (off - el->offset), partial);
167 if (cv) {
168 tdb_convert(buf, len);
169 }
170 len -= partial;
171 off += partial;
172 buf = (void *)(partial + (char *)buf);
173
174 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175 goto fail;
176 }
177
178 return 0;
179 }
180
181 /* its not in the transaction elements - do a real read */
182 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186 tdb->ecode = TDB_ERR_IO;
187 tdb->transaction->transaction_error = 1;
188 return -1;
189 }
190
191
192 /*
193 write while in a transaction
194 */
transaction_write(struct tdb_context * tdb,tdb_off_t off,const void * buf,tdb_len_t len)195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
196 const void *buf, tdb_len_t len)
197 {
198 struct tdb_transaction_el *el, *best_el=NULL;
199
200 if (len == 0) {
201 return 0;
202 }
203
204 /* if the write is to a hash head, then update the transaction
205 hash heads */
206 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210 }
211
212 /* first see if we can replace an existing entry */
213 for (el=tdb->transaction->elements_last;el;el=el->prev) {
214 tdb_len_t partial;
215
216 if (best_el == NULL && off == el->offset+el->length) {
217 best_el = el;
218 }
219
220 if (off+len <= el->offset) {
221 continue;
222 }
223 if (off >= el->offset + el->length) {
224 continue;
225 }
226
227 /* an overlapping write - needs to be split into up to
228 2 writes and a memcpy */
229 if (off < el->offset) {
230 partial = el->offset - off;
231 if (transaction_write(tdb, off, buf, partial) != 0) {
232 goto fail;
233 }
234 len -= partial;
235 off += partial;
236 buf = (const void *)(partial + (const char *)buf);
237 }
238 if (off + len <= el->offset + el->length) {
239 partial = len;
240 } else {
241 partial = el->offset + el->length - off;
242 }
243 memcpy(el->data + (off - el->offset), buf, partial);
244 len -= partial;
245 off += partial;
246 buf = (const void *)(partial + (const char *)buf);
247
248 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249 goto fail;
250 }
251
252 return 0;
253 }
254
255 /* see if we can append the new entry to an existing entry */
256 if (best_el && best_el->offset + best_el->length == off &&
257 (off+len < tdb->transaction->old_map_size ||
258 off > tdb->transaction->old_map_size)) {
259 unsigned char *data = best_el->data;
260 el = best_el;
261 el->data = (unsigned char *)realloc(el->data,
262 el->length + len);
263 if (el->data == NULL) {
264 tdb->ecode = TDB_ERR_OOM;
265 tdb->transaction->transaction_error = 1;
266 el->data = data;
267 return -1;
268 }
269 if (buf) {
270 memcpy(el->data + el->length, buf, len);
271 } else {
272 memset(el->data + el->length, TDB_PAD_BYTE, len);
273 }
274 el->length += len;
275 return 0;
276 }
277
278 /* add a new entry at the end of the list */
279 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
280 if (el == NULL) {
281 tdb->ecode = TDB_ERR_OOM;
282 tdb->transaction->transaction_error = 1;
283 return -1;
284 }
285 el->next = NULL;
286 el->prev = tdb->transaction->elements_last;
287 el->offset = off;
288 el->length = len;
289 el->data = (unsigned char *)malloc(len);
290 if (el->data == NULL) {
291 free(el);
292 tdb->ecode = TDB_ERR_OOM;
293 tdb->transaction->transaction_error = 1;
294 return -1;
295 }
296 if (buf) {
297 memcpy(el->data, buf, len);
298 } else {
299 memset(el->data, TDB_PAD_BYTE, len);
300 }
301 if (el->prev) {
302 el->prev->next = el;
303 } else {
304 tdb->transaction->elements = el;
305 }
306 tdb->transaction->elements_last = el;
307 return 0;
308
309 fail:
310 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
311 tdb->ecode = TDB_ERR_IO;
312 tdb->transaction->transaction_error = 1;
313 return -1;
314 }
315
316 /*
317 accelerated hash chain head search, using the cached hash heads
318 */
transaction_next_hash_chain(struct tdb_context * tdb,u32 * chain)319 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
320 {
321 u32 h = *chain;
322 for (;h < tdb->header.hash_size;h++) {
323 /* the +1 takes account of the freelist */
324 if (0 != tdb->transaction->hash_heads[h+1]) {
325 break;
326 }
327 }
328 (*chain) = h;
329 }
330
331 /*
332 out of bounds check during a transaction
333 */
transaction_oob(struct tdb_context * tdb,tdb_off_t len,int probe)334 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 {
336 if (len <= tdb->map_size) {
337 return 0;
338 }
339 return TDB_ERRCODE(TDB_ERR_IO, -1);
340 }
341
342 /*
343 transaction version of tdb_expand().
344 */
transaction_expand_file(struct tdb_context * tdb,tdb_off_t size,tdb_off_t addition)345 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
346 tdb_off_t addition)
347 {
348 /* add a write to the transaction elements, so subsequent
349 reads see the zero data */
350 if (transaction_write(tdb, size, NULL, addition) != 0) {
351 return -1;
352 }
353
354 return 0;
355 }
356
357 /*
358 brlock during a transaction - ignore them
359 */
transaction_brlock(struct tdb_context * tdb,tdb_off_t offset,int rw_type,int lck_type,int probe,size_t len)360 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
361 int rw_type, int lck_type, int probe, size_t len)
362 {
363 return 0;
364 }
365
366 static const struct tdb_methods transaction_methods = {
367 transaction_read,
368 transaction_write,
369 transaction_next_hash_chain,
370 transaction_oob,
371 transaction_expand_file,
372 transaction_brlock
373 };
374
375
376 /*
377 start a tdb transaction. No token is returned, as only a single
378 transaction is allowed to be pending per tdb_context
379 */
tdb_transaction_start(struct tdb_context * tdb)380 int tdb_transaction_start(struct tdb_context *tdb)
381 {
382 /* some sanity checks */
383 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
384 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
385 tdb->ecode = TDB_ERR_EINVAL;
386 return -1;
387 }
388
389 /* cope with nested tdb_transaction_start() calls */
390 if (tdb->transaction != NULL) {
391 tdb->transaction->nesting++;
392 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
393 tdb->transaction->nesting));
394 return 0;
395 }
396
397 if (tdb->num_locks != 0 || tdb->global_lock.count) {
398 /* the caller must not have any locks when starting a
399 transaction as otherwise we'll be screwed by lack
400 of nested locks in posix */
401 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
402 tdb->ecode = TDB_ERR_LOCK;
403 return -1;
404 }
405
406 if (tdb->travlocks.next != NULL) {
407 /* you cannot use transactions inside a traverse (although you can use
408 traverse inside a transaction) as otherwise you can end up with
409 deadlock */
410 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
411 tdb->ecode = TDB_ERR_LOCK;
412 return -1;
413 }
414
415 tdb->transaction = (struct tdb_transaction *)
416 calloc(sizeof(struct tdb_transaction), 1);
417 if (tdb->transaction == NULL) {
418 tdb->ecode = TDB_ERR_OOM;
419 return -1;
420 }
421
422 /* get the transaction write lock. This is a blocking lock. As
423 discussed with Volker, there are a number of ways we could
424 make this async, which we will probably do in the future */
425 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
426 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
427 tdb->ecode = TDB_ERR_LOCK;
428 SAFE_FREE(tdb->transaction);
429 return -1;
430 }
431
432 /* get a read lock from the freelist to the end of file. This
433 is upgraded to a write lock during the commit */
434 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
435 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
436 tdb->ecode = TDB_ERR_LOCK;
437 goto fail;
438 }
439
440 /* setup a copy of the hash table heads so the hash scan in
441 traverse can be fast */
442 tdb->transaction->hash_heads = (u32 *)
443 calloc(tdb->header.hash_size+1, sizeof(u32));
444 if (tdb->transaction->hash_heads == NULL) {
445 tdb->ecode = TDB_ERR_OOM;
446 goto fail;
447 }
448 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
449 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
450 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
451 tdb->ecode = TDB_ERR_IO;
452 goto fail;
453 }
454
455 /* make sure we know about any file expansions already done by
456 anyone else */
457 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
458 tdb->transaction->old_map_size = tdb->map_size;
459
460 /* finally hook the io methods, replacing them with
461 transaction specific methods */
462 tdb->transaction->io_methods = tdb->methods;
463 tdb->methods = &transaction_methods;
464
465 /* by calling this transaction write here, we ensure that we don't grow the
466 transaction linked list due to hash table updates */
467 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
468 TDB_HASHTABLE_SIZE(tdb)) != 0) {
469 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
470 tdb->ecode = TDB_ERR_IO;
471 goto fail;
472 }
473
474 return 0;
475
476 fail:
477 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
479 SAFE_FREE(tdb->transaction->hash_heads);
480 SAFE_FREE(tdb->transaction);
481 return -1;
482 }
483
484
485 /*
486 cancel the current transaction
487 */
tdb_transaction_cancel(struct tdb_context * tdb)488 int tdb_transaction_cancel(struct tdb_context *tdb)
489 {
490 if (tdb->transaction == NULL) {
491 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492 return -1;
493 }
494
495 if (tdb->transaction->nesting != 0) {
496 tdb->transaction->transaction_error = 1;
497 tdb->transaction->nesting--;
498 return 0;
499 }
500
501 tdb->map_size = tdb->transaction->old_map_size;
502
503 /* free all the transaction elements */
504 while (tdb->transaction->elements) {
505 struct tdb_transaction_el *el = tdb->transaction->elements;
506 tdb->transaction->elements = el->next;
507 free(el->data);
508 free(el);
509 }
510
511 /* remove any global lock created during the transaction */
512 if (tdb->global_lock.count != 0) {
513 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514 tdb->global_lock.count = 0;
515 }
516
517 /* remove any locks created during the transaction */
518 if (tdb->num_locks != 0) {
519 int h;
520 for (h=0;h<tdb->header.hash_size+1;h++) {
521 if (tdb->locked[h].count != 0) {
522 tdb_brlock(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
523 tdb->locked[h].count = 0;
524 }
525 }
526 tdb->num_locks = 0;
527 }
528
529 /* restore the normal io methods */
530 tdb->methods = tdb->transaction->io_methods;
531
532 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
533 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
534 SAFE_FREE(tdb->transaction->hash_heads);
535 SAFE_FREE(tdb->transaction);
536
537 return 0;
538 }
539
540 /*
541 sync to disk
542 */
transaction_sync(struct tdb_context * tdb,tdb_off_t offset,tdb_len_t length)543 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
544 {
545 if (fsync(tdb->fd) != 0) {
546 tdb->ecode = TDB_ERR_IO;
547 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
548 return -1;
549 }
550 #ifdef MS_SYNC
551 if (tdb->map_ptr) {
552 tdb_off_t moffset = offset & ~(tdb->page_size-1);
553 if (msync(moffset + (char *)tdb->map_ptr,
554 length + (offset - moffset), MS_SYNC) != 0) {
555 tdb->ecode = TDB_ERR_IO;
556 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
557 strerror(errno)));
558 return -1;
559 }
560 }
561 #endif
562 return 0;
563 }
564
565
566 /*
567 work out how much space the linearised recovery data will consume
568 */
tdb_recovery_size(struct tdb_context * tdb)569 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
570 {
571 struct tdb_transaction_el *el;
572 tdb_len_t recovery_size = 0;
573
574 recovery_size = sizeof(u32);
575 for (el=tdb->transaction->elements;el;el=el->next) {
576 if (el->offset >= tdb->transaction->old_map_size) {
577 continue;
578 }
579 recovery_size += 2*sizeof(tdb_off_t) + el->length;
580 }
581
582 return recovery_size;
583 }
584
585 /*
586 allocate the recovery area, or use an existing recovery area if it is
587 large enough
588 */
tdb_recovery_allocate(struct tdb_context * tdb,tdb_len_t * recovery_size,tdb_off_t * recovery_offset,tdb_len_t * recovery_max_size)589 static int tdb_recovery_allocate(struct tdb_context *tdb,
590 tdb_len_t *recovery_size,
591 tdb_off_t *recovery_offset,
592 tdb_len_t *recovery_max_size)
593 {
594 struct list_struct rec;
595 const struct tdb_methods *methods = tdb->transaction->io_methods;
596 tdb_off_t recovery_head;
597
598 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
599 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
600 return -1;
601 }
602
603 rec.rec_len = 0;
604
605 if (recovery_head != 0 &&
606 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
607 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
608 return -1;
609 }
610
611 *recovery_size = tdb_recovery_size(tdb);
612
613 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
614 /* it fits in the existing area */
615 *recovery_max_size = rec.rec_len;
616 *recovery_offset = recovery_head;
617 return 0;
618 }
619
620 /* we need to free up the old recovery area, then allocate a
621 new one at the end of the file. Note that we cannot use
622 tdb_allocate() to allocate the new one as that might return
623 us an area that is being currently used (as of the start of
624 the transaction) */
625 if (recovery_head != 0) {
626 if (tdb_free(tdb, recovery_head, &rec) == -1) {
627 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
628 return -1;
629 }
630 }
631
632 /* the tdb_free() call might have increased the recovery size */
633 *recovery_size = tdb_recovery_size(tdb);
634
635 /* round up to a multiple of page size */
636 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
637 *recovery_offset = tdb->map_size;
638 recovery_head = *recovery_offset;
639
640 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
641 (tdb->map_size - tdb->transaction->old_map_size) +
642 sizeof(rec) + *recovery_max_size) == -1) {
643 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
644 return -1;
645 }
646
647 /* remap the file (if using mmap) */
648 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
649
650 /* we have to reset the old map size so that we don't try to expand the file
651 again in the transaction commit, which would destroy the recovery area */
652 tdb->transaction->old_map_size = tdb->map_size;
653
654 /* write the recovery header offset and sync - we can sync without a race here
655 as the magic ptr in the recovery record has not been set */
656 CONVERT(recovery_head);
657 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
658 &recovery_head, sizeof(tdb_off_t)) == -1) {
659 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
660 return -1;
661 }
662
663 return 0;
664 }
665
666
667 /*
668 setup the recovery data that will be used on a crash during commit
669 */
transaction_setup_recovery(struct tdb_context * tdb,tdb_off_t * magic_offset)670 static int transaction_setup_recovery(struct tdb_context *tdb,
671 tdb_off_t *magic_offset)
672 {
673 struct tdb_transaction_el *el;
674 tdb_len_t recovery_size;
675 unsigned char *data, *p;
676 const struct tdb_methods *methods = tdb->transaction->io_methods;
677 struct list_struct *rec;
678 tdb_off_t recovery_offset, recovery_max_size;
679 tdb_off_t old_map_size = tdb->transaction->old_map_size;
680 u32 magic, tailer;
681
682 /*
683 check that the recovery area has enough space
684 */
685 if (tdb_recovery_allocate(tdb, &recovery_size,
686 &recovery_offset, &recovery_max_size) == -1) {
687 return -1;
688 }
689
690 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
691 if (data == NULL) {
692 tdb->ecode = TDB_ERR_OOM;
693 return -1;
694 }
695
696 rec = (struct list_struct *)data;
697 memset(rec, 0, sizeof(*rec));
698
699 rec->magic = 0;
700 rec->data_len = recovery_size;
701 rec->rec_len = recovery_max_size;
702 rec->key_len = old_map_size;
703 CONVERT(rec);
704
705 /* build the recovery data into a single blob to allow us to do a single
706 large write, which should be more efficient */
707 p = data + sizeof(*rec);
708 for (el=tdb->transaction->elements;el;el=el->next) {
709 if (el->offset >= old_map_size) {
710 continue;
711 }
712 if (el->offset + el->length > tdb->transaction->old_map_size) {
713 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
714 free(data);
715 tdb->ecode = TDB_ERR_CORRUPT;
716 return -1;
717 }
718 memcpy(p, &el->offset, 4);
719 memcpy(p+4, &el->length, 4);
720 if (DOCONV()) {
721 tdb_convert(p, 8);
722 }
723 /* the recovery area contains the old data, not the
724 new data, so we have to call the original tdb_read
725 method to get it */
726 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
727 free(data);
728 tdb->ecode = TDB_ERR_IO;
729 return -1;
730 }
731 p += 8 + el->length;
732 }
733
734 /* and the tailer */
735 tailer = sizeof(*rec) + recovery_max_size;
736 memcpy(p, &tailer, 4);
737 CONVERT(p);
738
739 /* write the recovery data to the recovery area */
740 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
741 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
742 free(data);
743 tdb->ecode = TDB_ERR_IO;
744 return -1;
745 }
746
747 /* as we don't have ordered writes, we have to sync the recovery
748 data before we update the magic to indicate that the recovery
749 data is present */
750 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
751 free(data);
752 return -1;
753 }
754
755 free(data);
756
757 magic = TDB_RECOVERY_MAGIC;
758 CONVERT(magic);
759
760 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
761
762 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
763 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
764 tdb->ecode = TDB_ERR_IO;
765 return -1;
766 }
767
768 /* ensure the recovery magic marker is on disk */
769 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
770 return -1;
771 }
772
773 return 0;
774 }
775
776 /*
777 commit the current transaction
778 */
tdb_transaction_commit(struct tdb_context * tdb)779 int tdb_transaction_commit(struct tdb_context *tdb)
780 {
781 const struct tdb_methods *methods;
782 tdb_off_t magic_offset = 0;
783 u32 zero = 0;
784
785 if (tdb->transaction == NULL) {
786 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
787 return -1;
788 }
789
790 if (tdb->transaction->transaction_error) {
791 tdb->ecode = TDB_ERR_IO;
792 tdb_transaction_cancel(tdb);
793 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
794 return -1;
795 }
796
797 if (tdb->transaction->nesting != 0) {
798 tdb->transaction->nesting--;
799 return 0;
800 }
801
802 /* check for a null transaction */
803 if (tdb->transaction->elements == NULL) {
804 tdb_transaction_cancel(tdb);
805 return 0;
806 }
807
808 methods = tdb->transaction->io_methods;
809
810 /* if there are any locks pending then the caller has not
811 nested their locks properly, so fail the transaction */
812 if (tdb->num_locks || tdb->global_lock.count) {
813 tdb->ecode = TDB_ERR_LOCK;
814 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
815 tdb_transaction_cancel(tdb);
816 return -1;
817 }
818
819 /* upgrade the main transaction lock region to a write lock */
820 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
821 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
822 tdb->ecode = TDB_ERR_LOCK;
823 tdb_transaction_cancel(tdb);
824 return -1;
825 }
826
827 /* get the global lock - this prevents new users attaching to the database
828 during the commit */
829 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
830 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
831 tdb->ecode = TDB_ERR_LOCK;
832 tdb_transaction_cancel(tdb);
833 return -1;
834 }
835
836 if (!(tdb->flags & TDB_NOSYNC)) {
837 /* write the recovery data to the end of the file */
838 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
839 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
840 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
841 tdb_transaction_cancel(tdb);
842 return -1;
843 }
844 }
845
846 /* expand the file to the new size if needed */
847 if (tdb->map_size != tdb->transaction->old_map_size) {
848 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
849 tdb->map_size -
850 tdb->transaction->old_map_size) == -1) {
851 tdb->ecode = TDB_ERR_IO;
852 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
853 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
854 tdb_transaction_cancel(tdb);
855 return -1;
856 }
857 tdb->map_size = tdb->transaction->old_map_size;
858 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
859 }
860
861 /* perform all the writes */
862 while (tdb->transaction->elements) {
863 struct tdb_transaction_el *el = tdb->transaction->elements;
864
865 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
866 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
867
868 /* we've overwritten part of the data and
869 possibly expanded the file, so we need to
870 run the crash recovery code */
871 tdb->methods = methods;
872 tdb_transaction_recover(tdb);
873
874 tdb_transaction_cancel(tdb);
875 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
876
877 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
878 return -1;
879 }
880 tdb->transaction->elements = el->next;
881 free(el->data);
882 free(el);
883 }
884
885 if (!(tdb->flags & TDB_NOSYNC)) {
886 /* ensure the new data is on disk */
887 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
888 return -1;
889 }
890
891 /* remove the recovery marker */
892 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
893 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
894 return -1;
895 }
896
897 /* ensure the recovery marker has been removed on disk */
898 if (transaction_sync(tdb, magic_offset, 4) == -1) {
899 return -1;
900 }
901 }
902
903 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
904
905 /*
906 TODO: maybe write to some dummy hdr field, or write to magic
907 offset without mmap, before the last sync, instead of the
908 utime() call
909 */
910
911 /* on some systems (like Linux 2.6.x) changes via mmap/msync
912 don't change the mtime of the file, this means the file may
913 not be backed up (as tdb rounding to block sizes means that
914 file size changes are quite rare too). The following forces
915 mtime changes when a transaction completes */
916 #ifdef HAVE_UTIME
917 utime(tdb->name, NULL);
918 #endif
919
920 /* use a transaction cancel to free memory and remove the
921 transaction locks */
922 tdb_transaction_cancel(tdb);
923 return 0;
924 }
925
926
927 /*
928 recover from an aborted transaction. Must be called with exclusive
929 database write access already established (including the global
930 lock to prevent new processes attaching)
931 */
tdb_transaction_recover(struct tdb_context * tdb)932 int tdb_transaction_recover(struct tdb_context *tdb)
933 {
934 tdb_off_t recovery_head, recovery_eof;
935 unsigned char *data, *p;
936 u32 zero = 0;
937 struct list_struct rec;
938
939 /* find the recovery area */
940 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
941 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
942 tdb->ecode = TDB_ERR_IO;
943 return -1;
944 }
945
946 if (recovery_head == 0) {
947 /* we have never allocated a recovery record */
948 return 0;
949 }
950
951 /* read the recovery record */
952 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
953 sizeof(rec), DOCONV()) == -1) {
954 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
955 tdb->ecode = TDB_ERR_IO;
956 return -1;
957 }
958
959 if (rec.magic != TDB_RECOVERY_MAGIC) {
960 /* there is no valid recovery data */
961 return 0;
962 }
963
964 if (tdb->read_only) {
965 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
966 tdb->ecode = TDB_ERR_CORRUPT;
967 return -1;
968 }
969
970 recovery_eof = rec.key_len;
971
972 data = (unsigned char *)malloc(rec.data_len);
973 if (data == NULL) {
974 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
975 tdb->ecode = TDB_ERR_OOM;
976 return -1;
977 }
978
979 /* read the full recovery data */
980 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
981 rec.data_len, 0) == -1) {
982 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
983 tdb->ecode = TDB_ERR_IO;
984 return -1;
985 }
986
987 /* recover the file data */
988 p = data;
989 while (p+8 < data + rec.data_len) {
990 u32 ofs, len;
991 if (DOCONV()) {
992 tdb_convert(p, 8);
993 }
994 memcpy(&ofs, p, 4);
995 memcpy(&len, p+4, 4);
996
997 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
998 free(data);
999 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1000 tdb->ecode = TDB_ERR_IO;
1001 return -1;
1002 }
1003 p += 8 + len;
1004 }
1005
1006 free(data);
1007
1008 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1009 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1010 tdb->ecode = TDB_ERR_IO;
1011 return -1;
1012 }
1013
1014 /* if the recovery area is after the recovered eof then remove it */
1015 if (recovery_eof <= recovery_head) {
1016 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1017 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1018 tdb->ecode = TDB_ERR_IO;
1019 return -1;
1020 }
1021 }
1022
1023 /* remove the recovery magic */
1024 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1025 &zero) == -1) {
1026 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1027 tdb->ecode = TDB_ERR_IO;
1028 return -1;
1029 }
1030
1031 /* reduce the file size to the old size */
1032 tdb_munmap(tdb);
1033 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1034 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1035 tdb->ecode = TDB_ERR_IO;
1036 return -1;
1037 }
1038 tdb->map_size = recovery_eof;
1039 tdb_mmap(tdb);
1040
1041 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1042 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1043 tdb->ecode = TDB_ERR_IO;
1044 return -1;
1045 }
1046
1047 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1048 recovery_eof));
1049
1050 /* all done */
1051 return 0;
1052 }
1053