1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /*******************************************************************************
36  *
37  * Indexes table handling functions
38  *
39  * These are the tables containing offsets to a serial
40  * They are linked together
41  * They are often referenced by an unique table of indexes
42  *
43  ******************************************************************************/
44 
45 #define JOURNAL_CJF_BASE 1
46 
47 #include "dnsdb/dnsdb-config.h"
48 
49 #define ZDB_JOURNAL_CODE 1
50 
51 #include "dnsdb/journal.h"
52 
53 #if JOURNAL_CJF_ENABLED
54 
55 #include "dnsdb/journal-cjf-page-cache.h"
56 #include "dnsdb/journal-cjf-idxt.h"
57 #include "dnsdb/journal-cjf-common.h"
58 
59 #include <dnscore/logger.h>
60 #include <dnscore/file_input_stream.h>
61 #include <dnscore/buffer_input_stream.h>
62 #include <dnscore/file_output_stream.h>
63 #include <dnscore/buffer_output_stream.h>
64 #include <dnscore/fdtools.h>
65 #include <dnscore/serial.h>
66 
67 extern logger_handle* g_database_logger;
68 #define MODULE_MSG_HANDLE g_database_logger
69 
70 #define CJF_IDXT_SLOT_SIZE 8
71 
72 
73 const journal_cjf_idxt_tbl_item*
journal_cjf_idxt_get_entry(const journal_cjf * jnl,s16 index)74 journal_cjf_idxt_get_entry(const journal_cjf *jnl, s16 index)
75 {
76     yassert(index >= 0);yassert(jnl->idxt.first >= 0);
77 
78     journal_cjf_idxt_tbl_item *entry;
79     entry = &jnl->idxt.entries[(jnl->idxt.first + index) % jnl->idxt.size];
80     return entry;
81 }
82 
83 s16
journal_cjf_idxt_size(const journal_cjf * jnl)84 journal_cjf_idxt_size(const journal_cjf *jnl)
85 {
86     return jnl->idxt.count;
87 }
88 
89 s16
journal_cjf_idxt_size_max(const journal_cjf * jnl)90 journal_cjf_idxt_size_max(const journal_cjf *jnl)
91 {
92     return jnl->idxt.size;
93 }
94 
95 /**
96  *
97  * Returns the file offset value at index in the current IDXT
98  *
99  * @param jnl
100  * @param index
101  * @return
102  */
103 
104 u32
journal_cjf_idxt_get_file_offset(const journal_cjf * jnl,s16 index)105 journal_cjf_idxt_get_file_offset(const journal_cjf *jnl, s16 index)
106 {
107     u32 file_offset = journal_cjf_idxt_get_entry(jnl, index)->file_offset;
108     return file_offset;
109 }
110 
111 u32
journal_cjf_idxt_get_last_file_offset(const journal_cjf * jnl)112 journal_cjf_idxt_get_last_file_offset(const journal_cjf *jnl)
113 {
114     if(jnl->idxt.count > 0)
115     {
116         u32 n = journal_cjf_idxt_get_file_offset(jnl, jnl->idxt.count - 1);
117         return n;
118     }
119     else
120     {
121         return 0;
122     }
123 }
124 
125 /**
126  *
127  * Returns the last serial number value at index in the IDXT
128  *
129  * @param jnl
130  * @param index
131  * @return
132  */
133 
134 u32
journal_cjf_idxt_get_last_serial(const journal_cjf * jnl,s16 index)135 journal_cjf_idxt_get_last_serial(const journal_cjf *jnl, s16 index)
136 {
137     u32 last_serial = journal_cjf_idxt_get_entry(jnl, index)->last_serial;
138     return last_serial;
139 }
140 
141 /**
142  * Creates an empty table of indexes (IDXT) for the journal, with a minimum number of entries.
143  * Nothing is written to disk.
144  *
145  * @param jnl
146  * @param entries
147  */
148 
149 void
journal_cjf_idxt_create(journal_cjf * jnl,s16 entries)150 journal_cjf_idxt_create(journal_cjf *jnl, s16 entries)
151 {
152     yassert(jnl->idxt.size == 0);
153     yassert(entries >= 0);
154 
155     jnl->idxt.count = 1;
156     jnl->idxt.first = 0;
157     jnl->idxt.size = entries + 1;
158     jnl->idxt.dirty = TRUE;
159     jnl->idxt.marked = 0;
160 
161     MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, jnl->idxt.entries, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size, JCJFITI_TAG);
162     ZEROMEMORY(jnl->idxt.entries, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size);
163 
164     jnl->idxt.entries[0].last_serial = jnl->serial_begin;
165     jnl->idxt.entries[0].file_offset = jnl->last_page.file_offset;
166     jnl->first_page_offset = jnl->last_page.file_offset;
167 }
168 
169 /**
170  * Verifies the page table chain
171  * To be used for small discrepancies between the header and the page table (idxt)
172  *
173  * @param jnl
174  * @return
175  */
176 
177 static ya_result
journal_cjf_idxt_verify(journal_cjf * jnl)178 journal_cjf_idxt_verify(journal_cjf *jnl)
179 {
180     // check the values of the pages
181     // serials have to be ordered in serial arithmetics
182     // pages are supposed to start after each other except a looping ones that goes after the header
183 
184     // if the verify fails, a scan may be needed (other procedure)
185 
186     journal_cjf_page_tbl_header page_hdr;
187     u32 previous_page_offset;
188     u32 stream_end_offset;
189     u32 next_page_offset; // uninitialised false positive: either size is <= 0, skipping for & if, either it's >= 0 and page_hrd is set and initialises next_page_offset
190     u32 prev_serial = jnl->serial_begin;
191     int loops = 0;
192     bool error = FALSE;
193     bool has_page_after_header = FALSE;
194 
195     s16 size = journal_cjf_idxt_size(jnl);
196 
197     for(int page = 0; page < size; ++page)
198     {
199         const journal_cjf_idxt_tbl_item* entry = journal_cjf_idxt_get_entry(jnl, page);
200 
201         has_page_after_header |= entry->file_offset == CJF_HEADER_SIZE;
202 
203         if(page > 0)
204         {
205             if(entry->file_offset != next_page_offset) // gcc false positive: next_page_offset is initialised when page == 0
206             {
207                 // page do not start at the expected position
208                 log_err("cjf: %{dnsname}: page[%i] starts at an unexpected position (%u != expected %u)", jnl->origin, page, entry->file_offset, next_page_offset);
209                 error = TRUE;
210             }
211 
212             if(entry->file_offset == previous_page_offset)
213             {
214                 // broken chain
215                 log_err("cjf: %{dnsname}: page[%i] is a duplicate at position %u", jnl->origin, page, entry->file_offset);
216                 error = TRUE;
217             }
218 
219             if(entry->file_offset > previous_page_offset)
220             {
221                 if(entry->file_offset != stream_end_offset)
222                 {
223                     // suspicious hole in the file
224                     log_err("cjf: %{dnsname}: page[%i] is %u bytes after the expected position", jnl->origin, page, entry->file_offset - stream_end_offset);
225                     error = TRUE;
226                 }
227             }
228             else
229             {
230                 // just looped ...
231 
232                 if(loops == 0)
233                 {
234                     if(entry->file_offset > CJF_HEADER_SIZE)
235                     {
236                         // looped at an unexpected position
237                         log_err("cjf: %{dnsname}: page[%i] looped at an unexpected position (%u != expected %u)", jnl->origin, page, entry->file_offset, CJF_HEADER_SIZE);
238                         error = TRUE;
239                     }
240                     else if(entry->file_offset > CJF_HEADER_SIZE)
241                     {
242                         // looped at an unexpected position
243                         log_err("cjf: %{dnsname}: page[%i] looped into the header position (%u < %u)", jnl->origin, page, entry->file_offset, CJF_HEADER_SIZE);
244                         error = TRUE;
245                     }
246 
247                     loops = 1;
248                 }
249                 else
250                 {
251                     // should only have looped once
252                     log_err("cjf: %{dnsname}: page[%i] looped for a second time", jnl->origin, page);
253                     error = TRUE;
254                 }
255             }
256 
257             if(error)
258             {
259                 // got at least one error
260                 return ERROR;
261             }
262         }
263 
264         ssize_t pos = file_pool_seek(jnl->file, entry->file_offset, SEEK_SET);
265 
266         if(pos < 0)
267         {
268             // invalid position (as EBADF should not happen)
269             ya_result ret = ERRNO_ERROR;
270 
271             log_err("cjf: %{dnsname}: page[%i] seek at %u returned %r", jnl->origin, page, entry->file_offset, ret);
272 
273             return ret;
274         }
275 
276         int len = file_pool_readfully(jnl->file, &page_hdr, CJF_SECTION_INDEX_SLOT_HEAD);
277 
278         if(len != CJF_SECTION_INDEX_SLOT_HEAD)
279         {
280             if(len >= 0)
281             {
282                 log_err("cjf: %{dnsname}: page[%i] short count reading the header (%u < %u)", jnl->origin, page, len, CJF_SECTION_INDEX_SLOT_HEAD);
283                 return ERROR; // short
284             }
285             else
286             {
287                 log_err("cjf: %{dnsname}: page[%i] error reading the header: %r", jnl->origin, page, len);
288                 return len; // other error
289             }
290         }
291 
292         if(page_hdr.magic != CJF_PAGE_MAGIC)
293         {
294             // page is corrupted
295             log_err("cjf: %{dnsname}: page[%i] corrupted magic", jnl->origin, page);
296             return ERROR;
297         }
298         if(page_hdr.count > page_hdr.size)
299         {
300             // page is corrupted
301             log_err("cjf: %{dnsname}: page[%i] says to contain more than allowed", jnl->origin, page);
302             return ERROR;
303         }
304 
305         if(page_hdr.count == 0)
306         {
307             // empty page (warning)
308             log_warn("cjf: %{dnsname}: page[%i] is empty", jnl->origin, page);
309         }
310 
311         if(serial_gt(prev_serial, entry->last_serial))
312         {
313             // suspicious serial backward jump
314             log_err("cjf: %{dnsname}: page[%i] serial jumped back from %u to %u", jnl->origin, page, prev_serial, entry->last_serial);
315         }
316         else if(serial_eq(prev_serial, entry->last_serial))
317         {
318             // suspicious serial standstill
319             log_err("cjf: %{dnsname}: page[%i] serial didn't changed from %u", jnl->origin, page, prev_serial);
320         }
321 
322         previous_page_offset = entry->file_offset;
323         next_page_offset = page_hdr.next_page_offset; // next page, 0 for the last one
324         stream_end_offset = page_hdr.stream_end_offset; // start of next page, start of page table, or 0 for the last in the chain
325     }
326 
327     if(size > 0)
328     {
329         if(next_page_offset != 0) // gcc false positive: size > 0 => next_page_offset has been set to page_hdr.next_page_offset (read from the disk)
330         {
331             // chain end was not marked
332             log_err("cjf: %{dnsname}: page[%i] is last but points to a next at %u", jnl->origin, size - 1, next_page_offset);
333             return ERROR;
334         }
335         if(!has_page_after_header)
336         {
337             // no page at an obvious position
338             log_err("cjf: %{dnsname}: page table has no page at position %u", jnl->origin, CJF_HEADER_SIZE);
339             return ERROR;
340         }
341     }
342     else
343     {
344         // table is empty
345         log_err("cjf: %{dnsname}: page table is empty", jnl->origin);
346         return ERROR;
347     }
348 
349     return SUCCESS;
350 }
351 
352 /**
353  * Loads (or rebuilds) the table of indexes (IDXT)
354  *
355  * @param jnl
356  */
357 
358 void
journal_cjf_idxt_load(journal_cjf * jnl)359 journal_cjf_idxt_load(journal_cjf *jnl)
360 {
361     if(jnl->idxt.entries != NULL)
362     {
363         // already loaded ...
364         return;
365     }
366 
367     // the file is opened
368 
369     if(jnl->page_table_file_offset != 0)
370     {
371         log_debug1("journal_cjf_idxt_load: loading stored IDXT from '%s'", jnl->journal_file_name);
372 
373         // load
374         file_pool_seek(jnl->file, jnl->page_table_file_offset, SEEK_SET);
375 
376         input_stream fis;
377         input_stream bis;
378         file_pool_file_input_stream_init(&fis, jnl->file);
379 
380         buffer_input_stream_init(&bis, &fis, 512);
381         u8 magic[4];
382         input_stream_read(&bis, magic, 4);
383         u32 *magic_u32p = (u32*)&magic[0];
384         if(*magic_u32p == CJF_IDXT_MAGIC)
385         {
386             s16 count;
387             input_stream_read(&bis, (u8*)&count , 2);
388 
389             journal_cjf_idxt_create(jnl, count + 1);
390 
391             input_stream_read(&bis, (u8*)&jnl->idxt.entries[0], count * CJF_IDXT_SLOT_SIZE);
392 
393             file_pool_file_input_stream_detach(buffer_input_stream_get_filtered(&bis));
394             input_stream_close(&bis);
395 
396             jnl->idxt.count = count;
397 
398             u32 first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 0);
399 
400             if(jnl->first_page_offset != first_page_offset)
401             {
402                 // discrepancy : check the IDXT is valid
403 
404                 if(ISOK(journal_cjf_idxt_verify(jnl)))
405                 {
406                     // the header is wrong, update it
407 
408                     jnl->first_page_offset = first_page_offset;
409                 }
410             }
411 
412             return;
413         }
414 
415         file_pool_file_input_stream_detach(buffer_input_stream_get_filtered(&bis));
416         input_stream_close(&bis);
417 
418         // ERROR, need to rebuild
419     }
420 
421     log_debug1("journal_cjf_idxt_load: rebuilding IDXT from '%s', following the PAGE", jnl->journal_file_name);
422 
423     // rebuild
424 
425     journal_cjf_page_tbl_item *tbl;
426     u32 size = 512;
427     journal_cjf_page_tbl_item tmp_tbl[512];
428     tbl = tmp_tbl;
429 
430     if(jnl->first_page_offset < JOURNAL_CJF_PAGE_HEADER_SIZE)
431     {
432         // the PAGE chain has been lost : start from HEAD and follow the chain
433         // then after the 0, scan from the furthest known byte for PAGE+offset
434         // and follow until the chain points back to offset sizeof(head)
435     }
436 
437     // read the PAGE chain from the file (no caching)
438 
439     u32 index_offset = jnl->first_page_offset;
440     //u32 current_serial = jnl->serial_begin;
441     journal_cjf_page_tbl_header page_header;
442     journal_cjf_page_tbl_item page_last_item;
443     s16 idx = 0;
444     u32 page_serial = 0;
445     bool page_read = FALSE;
446 
447     do
448     {
449         // move to the page offset and read the header
450 
451         log_debug2("journal_cjf_idxt_load: reading '%s' PAGE header at %x", jnl->journal_file_name, index_offset);
452 
453         file_pool_seek(jnl->file, index_offset, SEEK_SET);
454         if(file_pool_readfully(jnl->file, &page_header, JOURNAL_CJF_PAGE_HEADER_SIZE) != JOURNAL_CJF_PAGE_HEADER_SIZE) // next offset
455         {
456             log_err("journal_cjf_idxt_load: '%s' is too corrupt to go on further reading PAGE header at %x", jnl->journal_file_name, index_offset);
457             break;
458         }
459         /*
460         if(page_header.magic != CJF_PAGE_MAGIC)
461         {
462             // corrupt
463         }
464         */
465         if(page_header.count > 0)
466         {
467             u32 tail_offset = (page_header.count - 1) * CJF_SECTION_INDEX_SLOT_SIZE;
468 
469             log_debug2("journal_cjf_idxt_load: reading '%s' PAGE tail at %x", jnl->journal_file_name, index_offset + tail_offset);
470 
471             // the last serial is on the last slot
472 
473             file_pool_seek(jnl->file, tail_offset, SEEK_CUR);
474             if(file_pool_readfully(jnl->file, &page_last_item, JOURNAL_CJF_PAGE_ITEM_SIZE) != JOURNAL_CJF_PAGE_ITEM_SIZE)
475             {
476                 log_err("journal_cjf_idxt_load: '%s' is too corrupt to go on further reading PAGE tail at %x", jnl->journal_file_name, index_offset + CJF_SECTION_INDEX_SIZE - CJF_SECTION_INDEX_SLOT_HEAD - CJF_SECTION_INDEX_SLOT_SIZE);
477                 break;
478             }
479 
480             // if there is a next page ...
481 
482             if(idx == size)
483             {
484                 log_debug2("journal_cjf_idxt_load: growing IDXT table from %i to %i", size, size * 2);
485 
486                 journal_cjf_page_tbl_item *tmp;
487                 MALLOC_OR_DIE(journal_cjf_page_tbl_item*, tmp, JOURNAL_CJF_PAGE_ITEM_SIZE * size * 2, JCJFTI_TAG);
488                 memcpy(tmp, tbl, JOURNAL_CJF_PAGE_ITEM_SIZE * size);
489                 if(tbl != tmp_tbl)
490                 {
491                     free(tbl);
492                 }
493                 tbl = tmp;
494                 size *= 2;
495             }
496 
497             tbl[idx].stream_file_offset = index_offset;
498             tbl[idx].ends_with_serial = page_last_item.ends_with_serial;
499 
500             log_debug2("journal_cjf_idxt_load: IDXT[%3i] = {%8x, %u}", idx, index_offset, page_last_item.ends_with_serial);
501 
502             page_serial = page_last_item.ends_with_serial;
503             page_read = TRUE;
504 
505             ++idx;
506 
507             index_offset = page_header.next_page_offset;
508         }
509         else
510         {
511             // an empty page should not exist
512 
513             if(page_read)
514             {
515                 if(serial_eq(page_serial, jnl->serial_end))
516                 {
517                     log_info("journal_cjf_idxt_load: got up to expected serial %i", page_serial);
518                 }
519                 else if(serial_lt(page_serial, jnl->serial_end))
520                 {
521                     log_err("journal_cjf_idxt_load: got up to serial %i, before the expected %i", page_serial, jnl->serial_end);
522                 }
523                 else if(serial_gt(page_serial, jnl->serial_end))
524                 {
525                     log_err("journal_cjf_idxt_load: got up to serial %i, after the expected %i", page_serial, jnl->serial_end);
526                 }
527             }
528             else
529             {
530                 log_err("journal_cjf_idxt_load: could not read the content of the journal");
531             }
532 
533             break;
534         }
535     }
536     while(index_offset != 0);
537 
538     log_debug1("journal_cjf_idxt_load: IDXT table has size %i", idx + 1);
539 
540     // scan for an SOA record
541 
542     journal_cjf_idxt_create(jnl, idx + 1);
543     memcpy(jnl->idxt.entries, tbl, JOURNAL_CJF_PAGE_ITEM_SIZE * idx);
544     jnl->idxt.count = idx;
545 }
546 
547 /**
548  *
549  * Writes the indexes table (IDXT) to the disk, if needed.
550  * Updates the header on disk accordingly.
551  * Clears the "dirty" and "makred" flags.
552  *
553  * @param jnl
554  */
555 
556 void
journal_cjf_idxt_flush(journal_cjf * jnl)557 journal_cjf_idxt_flush(journal_cjf *jnl)
558 {
559     // write the table on disk if not done already
560     if(!jnl->idxt.dirty)
561     {
562         return;
563     }
564 
565     if(jnl->page_table_file_offset == 0)
566     {
567         log_debug("cjf: %{dnsname}: table index not set", jnl->origin);
568         return;
569     }
570 
571     // write the table at the end
572 
573     off_t end = file_pool_seek(jnl->file, jnl->page_table_file_offset, SEEK_SET);
574 
575     if(end < 0)
576     {
577         int err = ERRNO_ERROR;
578 
579         log_err("cjf: forward to end of PAGE chain failed: %r", err);
580 
581         if(err == EBADF)
582         {
583             log_err("cjf: file has been closed before writing the summary");
584         }
585 
586         logger_flush();
587 
588         return;
589     }
590 
591     output_stream fos;
592     output_stream bos;
593 
594     journal_cjf_set_dirty(jnl);
595 
596     log_debug3("cjf: flushing IDXT %u indexes at %08x", jnl->idxt.count, jnl->page_table_file_offset);
597 
598     file_pool_file_output_stream_init(&fos, jnl->file);
599     file_pool_file_output_stream_set_full_writes(&fos, TRUE);      // this makes the stream "write fully"
600     buffer_output_stream_init(&bos, &fos, 512);
601     output_stream_write(&bos, (const u8*)"IDXT", 4);
602     output_stream_write(&bos, (const u8*)&jnl->idxt.count , 2);
603     for(s16 idx = 0; idx < jnl->idxt.count; idx++)
604     {
605         output_stream_write(&bos, (const u8*)&jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size], CJF_IDXT_SLOT_SIZE);
606     }
607     output_stream_write(&bos, (const u8*)"END", 4); // yes, with the '\0' at the end
608     output_stream_flush(&bos);
609     file_pool_file_output_stream_detach(buffer_output_stream_get_filtered(&bos));
610     output_stream_close(&bos);
611 
612     // write the table offset
613 
614     journal_cjf_header_flush(jnl);
615 
616 #if DO_SYNC
617     log_debug3("cjf: syncing to disk");
618 
619     file_pool_flush(jnl->file);
620 #endif
621 
622     jnl->idxt.dirty = FALSE;
623     jnl->idxt.marked = FALSE;
624 
625 #if _BSD_SOURCE || _XOPEN_SOURCE >= 500 || _XOPEN_SOURCE && _XOPEN_SOURCE_EXTENDED || /* Since glibc 2.3.5: */ _POSIX_C_SOURCE >= 200112L
626     u32 file_size = jnl->page_table_file_offset + 4 + 2 + 4 + jnl->idxt.count * CJF_IDXT_SLOT_SIZE;
627     file_pool_resize(jnl->file, file_size);
628 #endif
629 }
630 
631 /**
632  *
633  * Flushes the IDXT to disk if needed, then destroys the structure content.
634  *
635  * @param jnl
636  */
637 
638 void
journal_cjf_idxt_destroy(journal_cjf * jnl)639 journal_cjf_idxt_destroy(journal_cjf *jnl)
640 {
641     journal_cjf_idxt_flush(jnl);
642 
643     free(jnl->idxt.entries);
644     jnl->idxt.entries = NULL;
645 
646     jnl->idxt.size = 0;
647     jnl->idxt.first = 0;
648     jnl->idxt.count = 0;
649 }
650 
651 /**
652  * Updates the value of the last serial at current position in the PAGE
653  *
654  * @param jnl
655  * @param last_serial
656  */
657 
658 void
journal_cjf_idxt_update_last_serial(journal_cjf * jnl,u32 last_serial)659 journal_cjf_idxt_update_last_serial(journal_cjf *jnl, u32 last_serial)
660 {
661     yassert(jnl->idxt.size > 0);
662     journal_cjf_idxt_tbl_item *entry;
663 
664     entry = &jnl->idxt.entries[(jnl->idxt.first + jnl->idxt.count - 1) % jnl->idxt.size];
665 
666     log_debug2("cjf: IDXT current (%i) PAGE serial from %08x to %08x", jnl->idxt.count - 1, entry->last_serial, last_serial);
667 
668     entry->last_serial = last_serial;
669 
670     jnl->idxt.dirty = TRUE;
671 }
672 
673 /**
674  * Appends an PAGE table after the current one
675  *
676  * @param jcs
677  * @param size_hint
678  */
679 
680 static void
journal_cjf_idxt_append_page_nogrow(journal_cjf * jnl)681 journal_cjf_idxt_append_page_nogrow(journal_cjf *jnl)
682 {
683     yassert(jnl->idxt.size > 0);
684     journal_cjf_idxt_tbl_item *entry;
685 
686     jnl_page *page = &jnl->last_page; // last logical page on the (cycling) stream
687 
688     u32 page_offset = page->file_offset; // physical position of the page
689 
690     log_debug_jnl(jnl, "cjf: journal_cjf_idxt_append_page_nogrow: BEFORE");
691 
692     yassert(page->count <= page->size);
693 
694     page->size = page->count; // we are forcing the change of page (adding but not growing, thus losing the first page if needed)
695 
696     if(jnl->idxt.count < jnl->idxt.size)
697     {
698         // there is still room left in the file : no need to grow, so no problem here
699 
700         log_debug2("cjf: append PAGE at [%i] offset %u (%08x)", jnl->idxt.count, page->records_limit, page->records_limit);
701         // the entry is the next one (at position 'count'), modulo the size of the table
702         entry = &jnl->idxt.entries[(jnl->idxt.first + jnl->idxt.count) % jnl->idxt.size];
703         jnl->idxt.count++;
704 
705         entry->last_serial = page->serial_end;
706         entry->file_offset = page->records_limit;
707     }
708     else
709     {
710         // there is no room left thus we will replace the first page (increasing the first slot position)
711         // overwrite of the start of the cyclic data, update the journal
712 
713         /*
714          * No grow happens when the file is too big and we are about to loop
715          */
716 
717         u32 first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 0);
718 
719         log_debug2("cjf: append PAGE at [%i] offset %u (%08x), losing first PAGE", jnl->idxt.count, first_page_offset, first_page_offset);
720 
721         entry = &jnl->idxt.entries[(jnl->idxt.first) % jnl->idxt.size];
722 
723         yassert(jnl->first_page_offset == entry->file_offset);
724 
725         // removes first page, adjusts current PAGE offset_limit
726 
727         journal_cjf_remove_first_page(jnl); // will decrease the count and move the first
728 
729         jnl->idxt.count++;
730 
731         entry->last_serial = page->serial_end;
732         entry->file_offset = first_page_offset;
733 
734         // update the section with the values for the next one
735     }
736 
737     // update the section with the values for the next one
738 
739     page->file_offset = entry->file_offset;
740     page->count = 0;
741     page->size = CJF_SECTION_INDEX_SLOT_COUNT;
742     page->records_limit = page->file_offset + CJF_SECTION_INDEX_SIZE;
743 
744     if(page->file_offset >= jnl->first_page_offset)
745     {
746         page->file_offset_limit = jnl->file_maximum_size;
747     }
748     else
749     {
750         page->file_offset_limit = jnl->first_page_offset;
751     }
752 
753     page->serial_start = entry->last_serial;
754     page->serial_end = entry->last_serial;
755 
756     // update the next pointer of the previous PAGE
757 
758     // CFJ_PAGE_CACHE ->
759     log_debug3("cjf: updating PAGE chain (@%08x = %08x)", page_offset, page->file_offset);
760 
761     journal_cjf_page_tbl_header current_page_header;
762     journal_cjf_page_cache_read_header(jnl->file, page_offset, &current_page_header);
763     current_page_header.next_page_offset = page->file_offset;
764     journal_cjf_page_cache_write_header(jnl->file, page_offset, &current_page_header);
765 
766     // writes an empty PAGE table for the current (new) PAGE
767 
768     log_debug3("cjf: writing new empty PAGE");
769 
770     journal_cjf_page_cache_write_new_header(jnl->file, page->file_offset);
771     // CFJ_PAGE_CACHE <-
772 
773     // the IDXT had some changes that need flushing
774 
775     jnl->idxt.dirty = TRUE;
776 
777     // only mark the file about its changes once
778 
779     if(!jnl->idxt.marked)
780     {
781         jnl->idxt.marked = TRUE;
782     }
783 
784     journal_cjf_page_cache_flush(jnl->file);
785     journal_cjf_header_flush(jnl);
786 
787 #if DO_SYNC
788     log_debug3("cjf: syncing to disk");
789 
790     file_pool_flush(jnl->file);
791 #endif
792 
793     log_debug_jnl(jnl, "cjf: journal_cjf_idxt_append_page_nogrow: AFTER");
794 }
795 
796 /**
797  *
798  * Grows the IDTX table by one slot
799  *
800  * @param jnl
801  */
802 
803 static void
journal_cjf_idxt_grow(journal_cjf * jnl)804 journal_cjf_idxt_grow(journal_cjf *jnl)
805 {
806     yassert(jnl->idxt.size > 0);
807 
808     log_debug2("cjf: growing IDXT table to %u slots", jnl->idxt.size + 1);
809 
810     journal_cjf_idxt_tbl_item *tmp;
811     MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, tmp, sizeof(journal_cjf_idxt_tbl_item) * (jnl->idxt.size + 1), JCJFITI_TAG);
812 
813     for(s16 idx = 0; idx < jnl->idxt.count; idx++)
814     {
815         tmp[idx] = jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
816     }
817 
818     ++jnl->idxt.size;
819 
820     for(s16 idx = jnl->idxt.count; idx < jnl->idxt.size; idx++)
821     {
822         tmp[idx].last_serial = 0;
823         tmp[idx].file_offset = 0;
824     }
825 
826     log_debug_jnl(jnl, "cjf: journal_cjf_idxt_grow: BEFORE");
827 
828     free(jnl->idxt.entries);
829     jnl->idxt.entries = tmp;
830     jnl->idxt.first = 0;
831 
832     log_debug_jnl(jnl, "cjf: journal_cjf_idxt_grow: AFTER");
833 
834 }
835 
836 /**
837  * Ensures there is at least one empty available PAGE slot in the IDTX
838  *
839  * @param jnl
840  */
841 
842 static void
journal_cjf_idxt_ensure_growth(journal_cjf * jnl)843 journal_cjf_idxt_ensure_growth(journal_cjf *jnl)
844 {
845     log_debug2("cjf: ensuring IDXT growth");
846 
847     if(jnl->idxt.count == jnl->idxt.size)
848     {
849         journal_cjf_idxt_grow(jnl);
850     }
851 }
852 
853 /**
854  *
855  * Prevent the IDXT table from growing further
856  *
857  * @param jnl
858  */
859 
860 static void
journal_cjf_idxt_fix_size(journal_cjf * jnl)861 journal_cjf_idxt_fix_size(journal_cjf *jnl)
862 {
863     yassert(jnl->idxt.size > 0);
864     yassert(jnl->idxt.size >= jnl->idxt.count);
865 
866     if(jnl->idxt.size != jnl->idxt.count)
867     {
868         log_debug2("cjf: fixing IDXT size from %u to %u", jnl->idxt.size, jnl->idxt.count);
869 
870         journal_cjf_idxt_tbl_item *tmp;
871         MALLOC_OR_DIE(journal_cjf_idxt_tbl_item*, tmp, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.count, JCJFITI_TAG);
872 
873         for(s16 i = 0; i < jnl->idxt.count; ++i)
874         {
875             tmp[i] = jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
876         }
877 
878 #if DEBUG
879         memset(jnl->idxt.entries, 0xfe, sizeof(journal_cjf_idxt_tbl_item) * jnl->idxt.size);
880 #endif
881         free(jnl->idxt.entries);
882 
883         jnl->idxt.entries = tmp;
884 
885         jnl->idxt.first = 0;
886         jnl->idxt.size = jnl->idxt.count;
887     }
888     else
889     {
890         log_debug2("cjf: fixing IDXT size to %u (nothing to do)", jnl->idxt.count);
891     }
892 }
893 
894 /**
895  * Appends an PAGE after this one
896  *
897  * @param jnl
898  */
899 
900 void
journal_cjf_idxt_append_page(journal_cjf * jnl)901 journal_cjf_idxt_append_page(journal_cjf *jnl)
902 {
903     // where are we in the file ?
904 
905     log_debug2("cjf: PAGE: @%08x -> %08x ... %08x [%08x; %08x]",
906                jnl->last_page.file_offset, jnl->last_page.records_limit, jnl->last_page.file_offset_limit, jnl->last_page.serial_start, jnl->last_page.serial_end);
907 
908     // if the PAGE (offset) is before the first PAGE (offset)
909 
910     if(jnl->last_page.file_offset < jnl->first_page_offset)
911     {
912         log_debug2("cjf: IDXT adding PAGE (middle of the file)");
913 
914         // we are in the middle of the physical file (meaning, physically before the first PAGE in the logical order)
915 
916         yassert(jnl->last_page.records_limit <= jnl->first_page_offset);
917 
918         // ensure there is enough room after us
919         // while there is not enough room, remove one page
920 
921         while(jnl->first_page_offset - jnl->last_page.records_limit < CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE)
922         {
923             journal_cjf_remove_first_page(jnl);
924 
925             if(jnl->last_page.file_offset >= jnl->first_page_offset)
926             {
927                 break;
928             }
929         }
930 
931         // we made room or we reached a limit before we got enough
932 
933         yassert(jnl->first_page_offset - jnl->last_page.records_limit >= CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE);
934 
935         // make the IDXT grow if it full already
936 
937         journal_cjf_idxt_ensure_growth(jnl);
938 
939         // create a new page at jnl->page.offset_next
940     }
941     else
942     {
943         // we are at the end of the physical file
944 
945         log_debug2("cjf: IDXT adding PAGE (end of the file)");
946 
947         /// @note 20150210 edf -- A journal cannot loop with only one PAGE
948         // if it is expected to go beyond the maximum size with the next update, prevent the growth of the idtx table
949         // if we don't have at least two PAGE, then continue to grow the IDXT
950 
951         const bool has_at_least_two_pages = (jnl->idxt.count > 1);
952 
953         const bool too_close_to_the_file_size_limit = (jnl->last_page.records_limit  + CJF_SECTION_INDEX_SIZE + CJF_PAGE_ARBITRARY_UPDATE_SIZE > jnl->file_maximum_size);
954 
955         if(has_at_least_two_pages && too_close_to_the_file_size_limit)
956         {
957             journal_cjf_idxt_fix_size(jnl);
958         }
959         else
960         {
961             journal_cjf_idxt_ensure_growth(jnl);
962         }
963 
964         // create a new page in the idxt
965     }
966 
967     journal_cjf_idxt_append_page_nogrow(jnl);
968 }
969 
970 /*
971  scans all the PAGE entries from the IDXT and get the one that contains the serial
972  */
973 
974 ya_result
journal_cjf_idxt_get_page_index_from_serial(const journal_cjf * jnl,u32 serial)975 journal_cjf_idxt_get_page_index_from_serial(const journal_cjf *jnl, u32 serial)
976 {
977     u32 prev = jnl->serial_begin;
978 
979     if(serial_lt(serial, prev))
980     {
981         return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
982     }
983 
984     u32 prev_serial = jnl->serial_begin;
985 
986     s16 n = jnl->idxt.count;
987     for(s16 i = 0; i < n; i++)
988     {
989         journal_cjf_idxt_tbl_item *entry;
990         entry = &jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
991         // the last serial of an entry is the one of the last SOA added on it
992         // we want to start after that one
993         if(serial_lt(serial, entry->last_serial))
994         {
995             log_debug1("journal_cjf_idxt_get_page_index_from_serial(%s, %d) returning %i (%i -> %i)", jnl->journal_file_name, serial, i, prev_serial, entry->last_serial);
996             return i;
997         }
998         prev_serial = entry->last_serial;
999     }
1000 
1001     return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1002 }
1003 
1004 u32
journal_cjf_idxt_get_page_serial_from_index(const journal_cjf * jnl,int idx)1005 journal_cjf_idxt_get_page_serial_from_index(const journal_cjf *jnl, int idx)
1006 {
1007     if(idx > 0)
1008     {
1009         journal_cjf_idxt_tbl_item *prev_entry;
1010         prev_entry = &jnl->idxt.entries[(jnl->idxt.first + idx - 1) % jnl->idxt.size];
1011         return prev_entry->last_serial;
1012     }
1013     else
1014     {
1015         return jnl->serial_begin;
1016     }
1017 }
1018 
1019 /**
1020  * Returns the page index containing the serial, and optionally its position in the file.
1021  *
1022  * @param jnl
1023  * @param serial
1024  * @param file_offset
1025  * @return
1026  */
1027 
1028 ya_result
journal_cjf_idxt_get_page_offset_from_serial(const journal_cjf * jnl,u32 serial,u32 * file_offset)1029 journal_cjf_idxt_get_page_offset_from_serial(const journal_cjf *jnl, u32 serial, u32 *file_offset)
1030 {
1031     u32 prev_serial = jnl->serial_begin;
1032 
1033     // ensure the journal starts at least from the serial we are looking for
1034 
1035     if(serial_lt(serial, prev_serial))
1036     {
1037         return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1038     }
1039 
1040     s16 n = jnl->idxt.count;
1041     for(s16 i = 0; i < n; i++)
1042     {
1043         journal_cjf_idxt_tbl_item *entry;
1044         entry = &jnl->idxt.entries[(jnl->idxt.first + i) % jnl->idxt.size];
1045 
1046         // entry->last_serial is the last_serial TO, so the start of the next page
1047 
1048         if(serial_lt(serial, entry->last_serial))
1049         {
1050             log_debug1("journal_cjf_idxt_get_page_index_from_serial(%s, %d) returning %i (%i -> %i)",
1051                     jnl->journal_file_name, serial, i, prev_serial, entry->last_serial);
1052             if(file_offset != NULL)
1053             {
1054                 *file_offset = entry->file_offset;
1055             }
1056             return i;
1057         }
1058 
1059         prev_serial = entry->last_serial;
1060     }
1061 
1062     // too far ...
1063 
1064     return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1065 }
1066 
1067 ya_result
journal_cjf_idxt_get_page_serial_to(const journal_cjf * jnl,int idx)1068 journal_cjf_idxt_get_page_serial_to(const journal_cjf *jnl, int idx)
1069 {
1070     journal_cjf_idxt_tbl_item *entry;
1071     entry = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
1072     return entry->last_serial;
1073 }
1074 
1075 u32
journal_cjf_idxt_get_page_offset(const journal_cjf * jnl,int idx)1076 journal_cjf_idxt_get_page_offset(const journal_cjf *jnl, int idx)
1077 {
1078     journal_cjf_idxt_tbl_item *entry;
1079     entry = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
1080     return entry->file_offset;
1081 }
1082 
1083 #endif
1084