1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /*******************************************************************************
36 *
37 * Indexes table handling functions
38 *
39 * These are the tables containing offsets to a serial
40 * They are linked together
41 * They are often referenced by an unique table of indexes
42 *
43 ******************************************************************************/
44
45 #define JOURNAL_CJF_BASE 1
46
47 #include "dnsdb/dnsdb-config.h"
48
49 #define ZDB_JOURNAL_CODE 1
50
51 #include "dnsdb/journal.h"
52
53 #if JOURNAL_CJF_ENABLED
54
55 #include "dnsdb/journal-cjf-page-cache.h"
56 #include "dnsdb/journal-cjf-idxt.h"
57 #include "dnsdb/journal-cjf-common.h"
58
59 #include <dnscore/logger.h>
60 #include <dnscore/file_input_stream.h>
61 #include <dnscore/buffer_input_stream.h>
62 #include <dnscore/file_output_stream.h>
63 #include <dnscore/buffer_output_stream.h>
64 #include <dnscore/fdtools.h>
65 #include <dnscore/serial.h>
66
67 extern logger_handle* g_database_logger;
68 #define MODULE_MSG_HANDLE g_database_logger
69
70 #define CJF_IDXT_SLOT_SIZE 8
71
72
73 const journal_cjf_idxt_tbl_item*
journal_cjf_idxt_get_entry(const journal_cjf * jnl,s16 index)74 journal_cjf_idxt_get_entry(const journal_cjf *jnl, s16 index)
75 {
76 yassert(index >= 0);yassert(jnl->idxt.first >= 0);
77
78 journal_cjf_idxt_tbl_item *entry;
79 entry = &jnl->idxt.entries[(jnl->idxt.first + index) % jnl->idxt.size];
80 return entry;
81 }
82
83 s16
journal_cjf_idxt_size(const journal_cjf * jnl)84 journal_cjf_idxt_size(const journal_cjf *jnl)
85 {
86 return jnl->idxt.count;
87 }
88
89 s16
journal_cjf_idxt_size_max(const journal_cjf * jnl)90 journal_cjf_idxt_size_max(const journal_cjf *jnl)
91 {
92 return jnl->idxt.size;
93 }
94
95 /**
96 *
97 * Returns the file offset value at index in the current IDXT
98 *
99 * @param jnl
100 * @param index
101 * @return
102 */
103
104 u32
journal_cjf_idxt_get_file_offset(const journal_cjf * jnl,s16 index)105 journal_cjf_idxt_get_file_offset(const journal_cjf *jnl, s16 index)
106 {
107 u32 file_offset = journal_cjf_idxt_get_entry(jnl, index)->file_offset;
108 return file_offset;
109 }
110
111 u32
journal_cjf_idxt_get_last_file_offset(const journal_cjf * jnl)112 journal_cjf_idxt_get_last_file_offset(const journal_cjf *jnl)
113 {
114 if(jnl->idxt.count > 0)
115 {
116 u32 n = journal_cjf_idxt_get_file_offset(jnl, jnl->idxt.count - 1);
117 return n;
118 }
119 else
120 {
121 return 0;
122 }
123 }
124
125 /**
126 *
127 * Returns the last serial number value at index in the IDXT
128 *
129 * @param jnl
130 * @param index
131 * @return
132 */
133
134 u32
journal_cjf_idxt_get_last_serial(const journal_cjf * jnl,s16 index)135 journal_cjf_idxt_get_last_serial(const journal_cjf *jnl, s16 index)
136 {
137 u32 last_serial = journal_cjf_idxt_get_entry(jnl, index)->last_serial;
138 return last_serial;
139 }
140
141 /**
142 * Creates an empty table of indexes (IDXT) for the journal, with a minimum number of entries.
143 * Nothing is written to disk.
144 *
145 * @param jnl
146 * @param entries
147 */
148
149 void
journal_cjf_idxt_create(journal_cjf * jnl,s16 entries)150 journal_cjf_idxt_create(journal_cjf *jnl, s16 entries)
151 {
152 yassert(jnl->idxt.size == 0);
153 yassert(entries >= 0);
154
155 jnl->idxt.count = 1;
156 jnl->idxt.first = 0;
157 jnl->idxt.size = entries + 1;
158 jnl->idxt.dirty = TRUE;
159 jnl->idxt.marked = 0;
160
161 MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, jnl->idxt.entries, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size, JCJFITI_TAG);
162 ZEROMEMORY(jnl->idxt.entries, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size);
163
164 jnl->idxt.entries[0].last_serial = jnl->serial_begin;
165 jnl->idxt.entries[0].file_offset = jnl->last_page.file_offset;
166 jnl->first_page_offset = jnl->last_page.file_offset;
167 }
168
169 /**
170 * Verifies the page table chain
171 * To be used for small discrepancies between the header and the page table (idxt)
172 *
173 * @param jnl
174 * @return
175 */
176
177 static ya_result
journal_cjf_idxt_verify(journal_cjf * jnl)178 journal_cjf_idxt_verify(journal_cjf *jnl)
179 {
180 // check the values of the pages
181 // serials have to be ordered in serial arithmetics
182 // pages are supposed to start after each other except a looping ones that goes after the header
183
184 // if the verify fails, a scan may be needed (other procedure)
185
186 journal_cjf_page_tbl_header page_hdr;
187 u32 previous_page_offset;
188 u32 stream_end_offset;
189 u32 next_page_offset; // uninitialised false positive: either size is <= 0, skipping for & if, either it's >= 0 and page_hrd is set and initialises next_page_offset
190 u32 prev_serial = jnl->serial_begin;
191 int loops = 0;
192 bool error = FALSE;
193 bool has_page_after_header = FALSE;
194
195 s16 size = journal_cjf_idxt_size(jnl);
196
197 for(int page = 0; page < size; ++page)
198 {
199 const journal_cjf_idxt_tbl_item* entry = journal_cjf_idxt_get_entry(jnl, page);
200
201 has_page_after_header |= entry->file_offset == CJF_HEADER_SIZE;
202
203 if(page > 0)
204 {
205 if(entry->file_offset != next_page_offset) // gcc false positive: next_page_offset is initialised when page == 0
206 {
207 // page do not start at the expected position
208 log_err("cjf: %{dnsname}: page[%i] starts at an unexpected position (%u != expected %u)", jnl->origin, page, entry->file_offset, next_page_offset);
209 error = TRUE;
210 }
211
212 if(entry->file_offset == previous_page_offset)
213 {
214 // broken chain
215 log_err("cjf: %{dnsname}: page[%i] is a duplicate at position %u", jnl->origin, page, entry->file_offset);
216 error = TRUE;
217 }
218
219 if(entry->file_offset > previous_page_offset)
220 {
221 if(entry->file_offset != stream_end_offset)
222 {
223 // suspicious hole in the file
224 log_err("cjf: %{dnsname}: page[%i] is %u bytes after the expected position", jnl->origin, page, entry->file_offset - stream_end_offset);
225 error = TRUE;
226 }
227 }
228 else
229 {
230 // just looped ...
231
232 if(loops == 0)
233 {
234 if(entry->file_offset > CJF_HEADER_SIZE)
235 {
236 // looped at an unexpected position
237 log_err("cjf: %{dnsname}: page[%i] looped at an unexpected position (%u != expected %u)", jnl->origin, page, entry->file_offset, CJF_HEADER_SIZE);
238 error = TRUE;
239 }
240 else if(entry->file_offset > CJF_HEADER_SIZE)
241 {
242 // looped at an unexpected position
243 log_err("cjf: %{dnsname}: page[%i] looped into the header position (%u < %u)", jnl->origin, page, entry->file_offset, CJF_HEADER_SIZE);
244 error = TRUE;
245 }
246
247 loops = 1;
248 }
249 else
250 {
251 // should only have looped once
252 log_err("cjf: %{dnsname}: page[%i] looped for a second time", jnl->origin, page);
253 error = TRUE;
254 }
255 }
256
257 if(error)
258 {
259 // got at least one error
260 return ERROR;
261 }
262 }
263
264 ssize_t pos = file_pool_seek(jnl->file, entry->file_offset, SEEK_SET);
265
266 if(pos < 0)
267 {
268 // invalid position (as EBADF should not happen)
269 ya_result ret = ERRNO_ERROR;
270
271 log_err("cjf: %{dnsname}: page[%i] seek at %u returned %r", jnl->origin, page, entry->file_offset, ret);
272
273 return ret;
274 }
275
276 int len = file_pool_readfully(jnl->file, &page_hdr, CJF_SECTION_INDEX_SLOT_HEAD);
277
278 if(len != CJF_SECTION_INDEX_SLOT_HEAD)
279 {
280 if(len >= 0)
281 {
282 log_err("cjf: %{dnsname}: page[%i] short count reading the header (%u < %u)", jnl->origin, page, len, CJF_SECTION_INDEX_SLOT_HEAD);
283 return ERROR; // short
284 }
285 else
286 {
287 log_err("cjf: %{dnsname}: page[%i] error reading the header: %r", jnl->origin, page, len);
288 return len; // other error
289 }
290 }
291
292 if(page_hdr.magic != CJF_PAGE_MAGIC)
293 {
294 // page is corrupted
295 log_err("cjf: %{dnsname}: page[%i] corrupted magic", jnl->origin, page);
296 return ERROR;
297 }
298 if(page_hdr.count > page_hdr.size)
299 {
300 // page is corrupted
301 log_err("cjf: %{dnsname}: page[%i] says to contain more than allowed", jnl->origin, page);
302 return ERROR;
303 }
304
305 if(page_hdr.count == 0)
306 {
307 // empty page (warning)
308 log_warn("cjf: %{dnsname}: page[%i] is empty", jnl->origin, page);
309 }
310
311 if(serial_gt(prev_serial, entry->last_serial))
312 {
313 // suspicious serial backward jump
314 log_err("cjf: %{dnsname}: page[%i] serial jumped back from %u to %u", jnl->origin, page, prev_serial, entry->last_serial);
315 }
316 else if(serial_eq(prev_serial, entry->last_serial))
317 {
318 // suspicious serial standstill
319 log_err("cjf: %{dnsname}: page[%i] serial didn't changed from %u", jnl->origin, page, prev_serial);
320 }
321
322 previous_page_offset = entry->file_offset;
323 next_page_offset = page_hdr.next_page_offset; // next page, 0 for the last one
324 stream_end_offset = page_hdr.stream_end_offset; // start of next page, start of page table, or 0 for the last in the chain
325 }
326
327 if(size > 0)
328 {
329 if(next_page_offset != 0) // gcc false positive: size > 0 => next_page_offset has been set to page_hdr.next_page_offset (read from the disk)
330 {
331 // chain end was not marked
332 log_err("cjf: %{dnsname}: page[%i] is last but points to a next at %u", jnl->origin, size - 1, next_page_offset);
333 return ERROR;
334 }
335 if(!has_page_after_header)
336 {
337 // no page at an obvious position
338 log_err("cjf: %{dnsname}: page table has no page at position %u", jnl->origin, CJF_HEADER_SIZE);
339 return ERROR;
340 }
341 }
342 else
343 {
344 // table is empty
345 log_err("cjf: %{dnsname}: page table is empty", jnl->origin);
346 return ERROR;
347 }
348
349 return SUCCESS;
350 }
351
352 /**
353 * Loads (or rebuilds) the table of indexes (IDXT)
354 *
355 * @param jnl
356 */
357
358 void
journal_cjf_idxt_load(journal_cjf * jnl)359 journal_cjf_idxt_load(journal_cjf *jnl)
360 {
361 if(jnl->idxt.entries != NULL)
362 {
363 // already loaded ...
364 return;
365 }
366
367 // the file is opened
368
369 if(jnl->page_table_file_offset != 0)
370 {
371 log_debug1("journal_cjf_idxt_load: loading stored IDXT from '%s'", jnl->journal_file_name);
372
373 // load
374 file_pool_seek(jnl->file, jnl->page_table_file_offset, SEEK_SET);
375
376 input_stream fis;
377 input_stream bis;
378 file_pool_file_input_stream_init(&fis, jnl->file);
379
380 buffer_input_stream_init(&bis, &fis, 512);
381 u8 magic[4];
382 input_stream_read(&bis, magic, 4);
383 u32 *magic_u32p = (u32*)&magic[0];
384 if(*magic_u32p == CJF_IDXT_MAGIC)
385 {
386 s16 count;
387 input_stream_read(&bis, (u8*)&count , 2);
388
389 journal_cjf_idxt_create(jnl, count + 1);
390
391 input_stream_read(&bis, (u8*)&jnl->idxt.entries[0], count * CJF_IDXT_SLOT_SIZE);
392
393 file_pool_file_input_stream_detach(buffer_input_stream_get_filtered(&bis));
394 input_stream_close(&bis);
395
396 jnl->idxt.count = count;
397
398 u32 first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 0);
399
400 if(jnl->first_page_offset != first_page_offset)
401 {
402 // discrepancy : check the IDXT is valid
403
404 if(ISOK(journal_cjf_idxt_verify(jnl)))
405 {
406 // the header is wrong, update it
407
408 jnl->first_page_offset = first_page_offset;
409 }
410 }
411
412 return;
413 }
414
415 file_pool_file_input_stream_detach(buffer_input_stream_get_filtered(&bis));
416 input_stream_close(&bis);
417
418 // ERROR, need to rebuild
419 }
420
421 log_debug1("journal_cjf_idxt_load: rebuilding IDXT from '%s', following the PAGE", jnl->journal_file_name);
422
423 // rebuild
424
425 journal_cjf_page_tbl_item *tbl;
426 u32 size = 512;
427 journal_cjf_page_tbl_item tmp_tbl[512];
428 tbl = tmp_tbl;
429
430 if(jnl->first_page_offset < JOURNAL_CJF_PAGE_HEADER_SIZE)
431 {
432 // the PAGE chain has been lost : start from HEAD and follow the chain
433 // then after the 0, scan from the furthest known byte for PAGE+offset
434 // and follow until the chain points back to offset sizeof(head)
435 }
436
437 // read the PAGE chain from the file (no caching)
438
439 u32 index_offset = jnl->first_page_offset;
440 //u32 current_serial = jnl->serial_begin;
441 journal_cjf_page_tbl_header page_header;
442 journal_cjf_page_tbl_item page_last_item;
443 s16 idx = 0;
444 u32 page_serial = 0;
445 bool page_read = FALSE;
446
447 do
448 {
449 // move to the page offset and read the header
450
451 log_debug2("journal_cjf_idxt_load: reading '%s' PAGE header at %x", jnl->journal_file_name, index_offset);
452
453 file_pool_seek(jnl->file, index_offset, SEEK_SET);
454 if(file_pool_readfully(jnl->file, &page_header, JOURNAL_CJF_PAGE_HEADER_SIZE) != JOURNAL_CJF_PAGE_HEADER_SIZE) // next offset
455 {
456 log_err("journal_cjf_idxt_load: '%s' is too corrupt to go on further reading PAGE header at %x", jnl->journal_file_name, index_offset);
457 break;
458 }
459 /*
460 if(page_header.magic != CJF_PAGE_MAGIC)
461 {
462 // corrupt
463 }
464 */
465 if(page_header.count > 0)
466 {
467 u32 tail_offset = (page_header.count - 1) * CJF_SECTION_INDEX_SLOT_SIZE;
468
469 log_debug2("journal_cjf_idxt_load: reading '%s' PAGE tail at %x", jnl->journal_file_name, index_offset + tail_offset);
470
471 // the last serial is on the last slot
472
473 file_pool_seek(jnl->file, tail_offset, SEEK_CUR);
474 if(file_pool_readfully(jnl->file, &page_last_item, JOURNAL_CJF_PAGE_ITEM_SIZE) != JOURNAL_CJF_PAGE_ITEM_SIZE)
475 {
476 log_err("journal_cjf_idxt_load: '%s' is too corrupt to go on further reading PAGE tail at %x", jnl->journal_file_name, index_offset + CJF_SECTION_INDEX_SIZE - CJF_SECTION_INDEX_SLOT_HEAD - CJF_SECTION_INDEX_SLOT_SIZE);
477 break;
478 }
479
480 // if there is a next page ...
481
482 if(idx == size)
483 {
484 log_debug2("journal_cjf_idxt_load: growing IDXT table from %i to %i", size, size * 2);
485
486 journal_cjf_page_tbl_item *tmp;
487 MALLOC_OR_DIE(journal_cjf_page_tbl_item*, tmp, JOURNAL_CJF_PAGE_ITEM_SIZE * size * 2, JCJFTI_TAG);
488 memcpy(tmp, tbl, JOURNAL_CJF_PAGE_ITEM_SIZE * size);
489 if(tbl != tmp_tbl)
490 {
491 free(tbl);
492 }
493 tbl = tmp;
494 size *= 2;
495 }
496
497 tbl[idx].stream_file_offset = index_offset;
498 tbl[idx].ends_with_serial = page_last_item.ends_with_serial;
499
500 log_debug2("journal_cjf_idxt_load: IDXT[%3i] = {%8x, %u}", idx, index_offset, page_last_item.ends_with_serial);
501
502 page_serial = page_last_item.ends_with_serial;
503 page_read = TRUE;
504
505 ++idx;
506
507 index_offset = page_header.next_page_offset;
508 }
509 else
510 {
511 // an empty page should not exist
512
513 if(page_read)
514 {
515 if(serial_eq(page_serial, jnl->serial_end))
516 {
517 log_info("journal_cjf_idxt_load: got up to expected serial %i", page_serial);
518 }
519 else if(serial_lt(page_serial, jnl->serial_end))
520 {
521 log_err("journal_cjf_idxt_load: got up to serial %i, before the expected %i", page_serial, jnl->serial_end);
522 }
523 else if(serial_gt(page_serial, jnl->serial_end))
524 {
525 log_err("journal_cjf_idxt_load: got up to serial %i, after the expected %i", page_serial, jnl->serial_end);
526 }
527 }
528 else
529 {
530 log_err("journal_cjf_idxt_load: could not read the content of the journal");
531 }
532
533 break;
534 }
535 }
536 while(index_offset != 0);
537
538 log_debug1("journal_cjf_idxt_load: IDXT table has size %i", idx + 1);
539
540 // scan for an SOA record
541
542 journal_cjf_idxt_create(jnl, idx + 1);
543 memcpy(jnl->idxt.entries, tbl, JOURNAL_CJF_PAGE_ITEM_SIZE * idx);
544 jnl->idxt.count = idx;
545 }
546
547 /**
548 *
549 * Writes the indexes table (IDXT) to the disk, if needed.
550 * Updates the header on disk accordingly.
551 * Clears the "dirty" and "makred" flags.
552 *
553 * @param jnl
554 */
555
556 void
journal_cjf_idxt_flush(journal_cjf * jnl)557 journal_cjf_idxt_flush(journal_cjf *jnl)
558 {
559 // write the table on disk if not done already
560 if(!jnl->idxt.dirty)
561 {
562 return;
563 }
564
565 if(jnl->page_table_file_offset == 0)
566 {
567 log_debug("cjf: %{dnsname}: table index not set", jnl->origin);
568 return;
569 }
570
571 // write the table at the end
572
573 off_t end = file_pool_seek(jnl->file, jnl->page_table_file_offset, SEEK_SET);
574
575 if(end < 0)
576 {
577 int err = ERRNO_ERROR;
578
579 log_err("cjf: forward to end of PAGE chain failed: %r", err);
580
581 if(err == EBADF)
582 {
583 log_err("cjf: file has been closed before writing the summary");
584 }
585
586 logger_flush();
587
588 return;
589 }
590
591 output_stream fos;
592 output_stream bos;
593
594 journal_cjf_set_dirty(jnl);
595
596 log_debug3("cjf: flushing IDXT %u indexes at %08x", jnl->idxt.count, jnl->page_table_file_offset);
597
598 file_pool_file_output_stream_init(&fos, jnl->file);
599 file_pool_file_output_stream_set_full_writes(&fos, TRUE); // this makes the stream "write fully"
600 buffer_output_stream_init(&bos, &fos, 512);
601 output_stream_write(&bos, (const u8*)"IDXT", 4);
602 output_stream_write(&bos, (const u8*)&jnl->idxt.count , 2);
603 for(s16 idx = 0; idx < jnl->idxt.count; idx++)
604 {
605 output_stream_write(&bos, (const u8*)&jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size], CJF_IDXT_SLOT_SIZE);
606 }
607 output_stream_write(&bos, (const u8*)"END", 4); // yes, with the '\0' at the end
608 output_stream_flush(&bos);
609 file_pool_file_output_stream_detach(buffer_output_stream_get_filtered(&bos));
610 output_stream_close(&bos);
611
612 // write the table offset
613
614 journal_cjf_header_flush(jnl);
615
616 #if DO_SYNC
617 log_debug3("cjf: syncing to disk");
618
619 file_pool_flush(jnl->file);
620 #endif
621
622 jnl->idxt.dirty = FALSE;
623 jnl->idxt.marked = FALSE;
624
625 #if _BSD_SOURCE || _XOPEN_SOURCE >= 500 || _XOPEN_SOURCE && _XOPEN_SOURCE_EXTENDED || /* Since glibc 2.3.5: */ _POSIX_C_SOURCE >= 200112L
626 u32 file_size = jnl->page_table_file_offset + 4 + 2 + 4 + jnl->idxt.count * CJF_IDXT_SLOT_SIZE;
627 file_pool_resize(jnl->file, file_size);
628 #endif
629 }
630
631 /**
632 *
633 * Flushes the IDXT to disk if needed, then destroys the structure content.
634 *
635 * @param jnl
636 */
637
638 void
journal_cjf_idxt_destroy(journal_cjf * jnl)639 journal_cjf_idxt_destroy(journal_cjf *jnl)
640 {
641 journal_cjf_idxt_flush(jnl);
642
643 free(jnl->idxt.entries);
644 jnl->idxt.entries = NULL;
645
646 jnl->idxt.size = 0;
647 jnl->idxt.first = 0;
648 jnl->idxt.count = 0;
649 }
650
651 /**
652 * Updates the value of the last serial at current position in the PAGE
653 *
654 * @param jnl
655 * @param last_serial
656 */
657
658 void
journal_cjf_idxt_update_last_serial(journal_cjf * jnl,u32 last_serial)659 journal_cjf_idxt_update_last_serial(journal_cjf *jnl, u32 last_serial)
660 {
661 yassert(jnl->idxt.size > 0);
662 journal_cjf_idxt_tbl_item *entry;
663
664 entry = &jnl->idxt.entries[(jnl->idxt.first + jnl->idxt.count - 1) % jnl->idxt.size];
665
666 log_debug2("cjf: IDXT current (%i) PAGE serial from %08x to %08x", jnl->idxt.count - 1, entry->last_serial, last_serial);
667
668 entry->last_serial = last_serial;
669
670 jnl->idxt.dirty = TRUE;
671 }
672
673 /**
674 * Appends an PAGE table after the current one
675 *
676 * @param jcs
677 * @param size_hint
678 */
679
680 static void
journal_cjf_idxt_append_page_nogrow(journal_cjf * jnl)681 journal_cjf_idxt_append_page_nogrow(journal_cjf *jnl)
682 {
683 yassert(jnl->idxt.size > 0);
684 journal_cjf_idxt_tbl_item *entry;
685
686 jnl_page *page = &jnl->last_page; // last logical page on the (cycling) stream
687
688 u32 page_offset = page->file_offset; // physical position of the page
689
690 log_debug_jnl(jnl, "cjf: journal_cjf_idxt_append_page_nogrow: BEFORE");
691
692 yassert(page->count <= page->size);
693
694 page->size = page->count; // we are forcing the change of page (adding but not growing, thus losing the first page if needed)
695
696 if(jnl->idxt.count < jnl->idxt.size)
697 {
698 // there is still room left in the file : no need to grow, so no problem here
699
700 log_debug2("cjf: append PAGE at [%i] offset %u (%08x)", jnl->idxt.count, page->records_limit, page->records_limit);
701 // the entry is the next one (at position 'count'), modulo the size of the table
702 entry = &jnl->idxt.entries[(jnl->idxt.first + jnl->idxt.count) % jnl->idxt.size];
703 jnl->idxt.count++;
704
705 entry->last_serial = page->serial_end;
706 entry->file_offset = page->records_limit;
707 }
708 else
709 {
710 // there is no room left thus we will replace the first page (increasing the first slot position)
711 // overwrite of the start of the cyclic data, update the journal
712
713 /*
714 * No grow happens when the file is too big and we are about to loop
715 */
716
717 u32 first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 0);
718
719 log_debug2("cjf: append PAGE at [%i] offset %u (%08x), losing first PAGE", jnl->idxt.count, first_page_offset, first_page_offset);
720
721 entry = &jnl->idxt.entries[(jnl->idxt.first) % jnl->idxt.size];
722
723 yassert(jnl->first_page_offset == entry->file_offset);
724
725 // removes first page, adjusts current PAGE offset_limit
726
727 journal_cjf_remove_first_page(jnl); // will decrease the count and move the first
728
729 jnl->idxt.count++;
730
731 entry->last_serial = page->serial_end;
732 entry->file_offset = first_page_offset;
733
734 // update the section with the values for the next one
735 }
736
737 // update the section with the values for the next one
738
739 page->file_offset = entry->file_offset;
740 page->count = 0;
741 page->size = CJF_SECTION_INDEX_SLOT_COUNT;
742 page->records_limit = page->file_offset + CJF_SECTION_INDEX_SIZE;
743
744 if(page->file_offset >= jnl->first_page_offset)
745 {
746 page->file_offset_limit = jnl->file_maximum_size;
747 }
748 else
749 {
750 page->file_offset_limit = jnl->first_page_offset;
751 }
752
753 page->serial_start = entry->last_serial;
754 page->serial_end = entry->last_serial;
755
756 // update the next pointer of the previous PAGE
757
758 // CFJ_PAGE_CACHE ->
759 log_debug3("cjf: updating PAGE chain (@%08x = %08x)", page_offset, page->file_offset);
760
761 journal_cjf_page_tbl_header current_page_header;
762 journal_cjf_page_cache_read_header(jnl->file, page_offset, ¤t_page_header);
763 current_page_header.next_page_offset = page->file_offset;
764 journal_cjf_page_cache_write_header(jnl->file, page_offset, ¤t_page_header);
765
766 // writes an empty PAGE table for the current (new) PAGE
767
768 log_debug3("cjf: writing new empty PAGE");
769
770 journal_cjf_page_cache_write_new_header(jnl->file, page->file_offset);
771 // CFJ_PAGE_CACHE <-
772
773 // the IDXT had some changes that need flushing
774
775 jnl->idxt.dirty = TRUE;
776
777 // only mark the file about its changes once
778
779 if(!jnl->idxt.marked)
780 {
781 jnl->idxt.marked = TRUE;
782 }
783
784 journal_cjf_page_cache_flush(jnl->file);
785 journal_cjf_header_flush(jnl);
786
787 #if DO_SYNC
788 log_debug3("cjf: syncing to disk");
789
790 file_pool_flush(jnl->file);
791 #endif
792
793 log_debug_jnl(jnl, "cjf: journal_cjf_idxt_append_page_nogrow: AFTER");
794 }
795
796 /**
797 *
798 * Grows the IDTX table by one slot
799 *
800 * @param jnl
801 */
802
803 static void
journal_cjf_idxt_grow(journal_cjf * jnl)804 journal_cjf_idxt_grow(journal_cjf *jnl)
805 {
806 yassert(jnl->idxt.size > 0);
807
808 log_debug2("cjf: growing IDXT table to %u slots", jnl->idxt.size + 1);
809
810 journal_cjf_idxt_tbl_item *tmp;
811 MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, tmp, sizeof(journal_cjf_idxt_tbl_item) * (jnl->idxt.size + 1), JCJFITI_TAG);
812
813 for(s16 idx = 0; idx < jnl->idxt.count; idx++)
814 {
815 tmp[idx] = jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
816 }
817
818 ++jnl->idxt.size;
819
820 for(s16 idx = jnl->idxt.count; idx < jnl->idxt.size; idx++)
821 {
822 tmp[idx].last_serial = 0;
823 tmp[idx].file_offset = 0;
824 }
825
826 log_debug_jnl(jnl, "cjf: journal_cjf_idxt_grow: BEFORE");
827
828 free(jnl->idxt.entries);
829 jnl->idxt.entries = tmp;
830 jnl->idxt.first = 0;
831
832 log_debug_jnl(jnl, "cjf: journal_cjf_idxt_grow: AFTER");
833
834 }
835
836 /**
837 * Ensures there is at least one empty available PAGE slot in the IDTX
838 *
839 * @param jnl
840 */
841
842 static void
journal_cjf_idxt_ensure_growth(journal_cjf * jnl)843 journal_cjf_idxt_ensure_growth(journal_cjf *jnl)
844 {
845 log_debug2("cjf: ensuring IDXT growth");
846
847 if(jnl->idxt.count == jnl->idxt.size)
848 {
849 journal_cjf_idxt_grow(jnl);
850 }
851 }
852
853 /**
854 *
855 * Prevent the IDXT table from growing further
856 *
857 * @param jnl
858 */
859
860 static void
journal_cjf_idxt_fix_size(journal_cjf * jnl)861 journal_cjf_idxt_fix_size(journal_cjf *jnl)
862 {
863 yassert(jnl->idxt.size > 0);
864 yassert(jnl->idxt.size >= jnl->idxt.count);
865
866 if(jnl->idxt.size != jnl->idxt.count)
867 {
868 log_debug2("cjf: fixing IDXT size from %u to %u", jnl->idxt.size, jnl->idxt.count);
869
870 journal_cjf_idxt_tbl_item *tmp;
871 MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, tmp, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.count, JCJFITI_TAG);
872
873 for(s16 i = 0; i < jnl->idxt.count; ++i)
874 {
875 tmp[i] = jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
876 }
877
878 #if DEBUG
879 memset(jnl->idxt.entries, 0xfe, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size);
880 #endif
881 free(jnl->idxt.entries);
882
883 jnl->idxt.entries = tmp;
884
885 jnl->idxt.first = 0;
886 jnl->idxt.size = jnl->idxt.count;
887 }
888 else
889 {
890 log_debug2("cjf: fixing IDXT size to %u (nothing to do)", jnl->idxt.count);
891 }
892 }
893
894 /**
895 * Appends an PAGE after this one
896 *
897 * @param jnl
898 */
899
900 void
journal_cjf_idxt_append_page(journal_cjf * jnl)901 journal_cjf_idxt_append_page(journal_cjf *jnl)
902 {
903 // where are we in the file ?
904
905 log_debug2("cjf: PAGE: @%08x -> %08x ... %08x [%08x; %08x]",
906 jnl->last_page.file_offset, jnl->last_page.records_limit, jnl->last_page.file_offset_limit, jnl->last_page.serial_start, jnl->last_page.serial_end);
907
908 // if the PAGE (offset) is before the first PAGE (offset)
909
910 if(jnl->last_page.file_offset < jnl->first_page_offset)
911 {
912 log_debug2("cjf: IDXT adding PAGE (middle of the file)");
913
914 // we are in the middle of the physical file (meaning, physically before the first PAGE in the logical order)
915
916 yassert(jnl->last_page.records_limit <= jnl->first_page_offset);
917
918 // ensure there is enough room after us
919 // while there is not enough room, remove one page
920
921 while(jnl->first_page_offset - jnl->last_page.records_limit < CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE)
922 {
923 journal_cjf_remove_first_page(jnl);
924
925 if(jnl->last_page.file_offset >= jnl->first_page_offset)
926 {
927 break;
928 }
929 }
930
931 // we made room or we reached a limit before we got enough
932
933 yassert(jnl->first_page_offset - jnl->last_page.records_limit >= CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE);
934
935 // make the IDXT grow if it full already
936
937 journal_cjf_idxt_ensure_growth(jnl);
938
939 // create a new page at jnl->page.offset_next
940 }
941 else
942 {
943 // we are at the end of the physical file
944
945 log_debug2("cjf: IDXT adding PAGE (end of the file)");
946
947 /// @note 20150210 edf -- A journal cannot loop with only one PAGE
948 // if it is expected to go beyond the maximum size with the next update, prevent the growth of the idtx table
949 // if we don't have at least two PAGE, then continue to grow the IDXT
950
951 const bool has_at_least_two_pages = (jnl->idxt.count > 1);
952
953 const bool too_close_to_the_file_size_limit = (jnl->last_page.records_limit + CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE > jnl->file_maximum_size);
954
955 if(has_at_least_two_pages && too_close_to_the_file_size_limit)
956 {
957 journal_cjf_idxt_fix_size(jnl);
958 }
959 else
960 {
961 journal_cjf_idxt_ensure_growth(jnl);
962 }
963
964 // create a new page in the idxt
965 }
966
967 journal_cjf_idxt_append_page_nogrow(jnl);
968 }
969
970 /*
971 scans all the PAGE entries from the IDXT and get the one that contains the serial
972 */
973
974 ya_result
journal_cjf_idxt_get_page_index_from_serial(const journal_cjf * jnl,u32 serial)975 journal_cjf_idxt_get_page_index_from_serial(const journal_cjf *jnl, u32 serial)
976 {
977 u32 prev = jnl->serial_begin;
978
979 if(serial_lt(serial, prev))
980 {
981 return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
982 }
983
984 u32 prev_serial = jnl->serial_begin;
985
986 s16 n = jnl->idxt.count;
987 for(s16 i = 0; i < n; i++)
988 {
989 journal_cjf_idxt_tbl_item *entry;
990 entry = &jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
991 // the last serial of an entry is the one of the last SOA added on it
992 // we want to start after that one
993 if(serial_lt(serial, entry->last_serial))
994 {
995 log_debug1("journal_cjf_idxt_get_page_index_from_serial(%s, %d) returning %i (%i -> %i)", jnl->journal_file_name, serial, i, prev_serial, entry->last_serial);
996 return i;
997 }
998 prev_serial = entry->last_serial;
999 }
1000
1001 return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1002 }
1003
1004 u32
journal_cjf_idxt_get_page_serial_from_index(const journal_cjf * jnl,int idx)1005 journal_cjf_idxt_get_page_serial_from_index(const journal_cjf *jnl, int idx)
1006 {
1007 if(idx > 0)
1008 {
1009 journal_cjf_idxt_tbl_item *prev_entry;
1010 prev_entry = &jnl->idxt.entries[(jnl->idxt.first + idx - 1) % jnl->idxt.size];
1011 return prev_entry->last_serial;
1012 }
1013 else
1014 {
1015 return jnl->serial_begin;
1016 }
1017 }
1018
1019 /**
1020 * Returns the page index containing the serial, and optionally its position in the file.
1021 *
1022 * @param jnl
1023 * @param serial
1024 * @param file_offset
1025 * @return
1026 */
1027
1028 ya_result
journal_cjf_idxt_get_page_offset_from_serial(const journal_cjf * jnl,u32 serial,u32 * file_offset)1029 journal_cjf_idxt_get_page_offset_from_serial(const journal_cjf *jnl, u32 serial, u32 *file_offset)
1030 {
1031 u32 prev_serial = jnl->serial_begin;
1032
1033 // ensure the journal starts at least from the serial we are looking for
1034
1035 if(serial_lt(serial, prev_serial))
1036 {
1037 return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1038 }
1039
1040 s16 n = jnl->idxt.count;
1041 for(s16 i = 0; i < n; i++)
1042 {
1043 journal_cjf_idxt_tbl_item *entry;
1044 entry = &jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
1045
1046 // entry->last_serial is the last_serial TO, so the start of the next page
1047
1048 if(serial_lt(serial, entry->last_serial))
1049 {
1050 log_debug1("journal_cjf_idxt_get_page_index_from_serial(%s, %d) returning %i (%i -> %i)",
1051 jnl->journal_file_name, serial, i, prev_serial, entry->last_serial);
1052 if(file_offset != NULL)
1053 {
1054 *file_offset = entry->file_offset;
1055 }
1056 return i;
1057 }
1058
1059 prev_serial = entry->last_serial;
1060 }
1061
1062 // too far ...
1063
1064 return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1065 }
1066
1067 ya_result
journal_cjf_idxt_get_page_serial_to(const journal_cjf * jnl,int idx)1068 journal_cjf_idxt_get_page_serial_to(const journal_cjf *jnl, int idx)
1069 {
1070 journal_cjf_idxt_tbl_item *entry;
1071 entry = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
1072 return entry->last_serial;
1073 }
1074
1075 u32
journal_cjf_idxt_get_page_offset(const journal_cjf * jnl,int idx)1076 journal_cjf_idxt_get_page_offset(const journal_cjf *jnl, int idx)
1077 {
1078 journal_cjf_idxt_tbl_item *entry;
1079 entry = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
1080 return entry->file_offset;
1081 }
1082
1083 #endif
1084