1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup
36  *  @ingroup
37  *  @brief
38  *
39  *
40  *
41  * @{
42  *
43  *----------------------------------------------------------------------------*/
44 
45 /*------------------------------------------------------------------------------
46  * GLOBAL VARIABLES */
47 
48 /*------------------------------------------------------------------------------
49  * STATIC PROTOTYPES */
50 
51 /*------------------------------------------------------------------------------
52  * FUNCTIONS */
53 
54 /** @brief Function ...
55  *
56  *  ...
57  *
58  *  @param ...
59  *
60  *  @retval OK
61  *  @retval NOK
62  */
63 
64 
65 #include "dnsdb/dnsdb-config.h"
66 
67 #define ZDB_JOURNAL_CODE 1
68 
69 #include "dnsdb/journal.h"
70 
71 #if JOURNAL_CJF_ENABLED
72 
73 #define JOURNAL_CJF_BASE 1
74 
75 #include "dnsdb/journal-cjf-page.h"
76 #include "dnsdb/journal-cjf-page-cache.h"
77 #include "dnsdb/journal-cjf-page-output-stream.h"
78 #include "dnsdb/journal-cjf-idxt.h"
79 #include "dnsdb/journal-cjf-common.h"
80 #include "dnsdb/journal-cjf.h"
81 
82 #include <sys/types.h>
83 #include <sys/stat.h>
84 #include <unistd.h>
85 #include <dirent.h>
86 #include <fcntl.h>
87 
88 #include <dnscore/file_input_stream.h>
89 #include <dnscore/empty-input-stream.h>
90 #include <dnscore/mutex.h>
91 #include <dnscore/serial.h>
92 #include <dnscore/dns_resource_record.h>
93 #include <dnscore/format.h>
94 
95 #include <dnscore/ptr_set.h>
96 #include <dnscore/fdtools.h>
97 
98 #include <dnscore/u32_set.h>
99 #include <dnscore/list-dl.h>
100 
101 #include <dnscore/ctrl-rfc.h>
102 
103 #include <dnscore/bytearray_output_stream.h>
104 #include <dnscore/bytearray_input_stream.h>
105 #include <dnscore/zalloc.h>
106 
107 #include "dnsdb/zdb_error.h"
108 #include "dnsdb/zdb_utils.h"
109 #include "dnsdb/journal.h"
110 #include "dnsdb/zdb_types.h"
111 #include "dnsdb/xfr_copy.h"
112 #include "dnsdb/zdb-zone-path-provider.h"
113 #include "dnsdb/zdb_zone.h"
114 
115 extern logger_handle* g_database_logger;
116 #define MODULE_MSG_HANDLE g_database_logger
117 
118 #define DEBUG_JOURNAL 1
119 #if !DEBUG
120 #undef DEBUG_JOURNAL
121 #define DEBUG_JOURNAL 0
122 #endif
123 
124 #define JOURNAL_FORMAT_NAME "cyclic"
125 #define VERSION_HI 0
126 #define VERSION_LO 1
127 #define JOURNAL_CLASS_NAME "journal_cjf"
128 
129 #define LOCK_NONE   0
130 #define LOCK_READ   1
131 #define LOCK_WRITE  2
132 
133 #define CJF_EXT "cjf"
134 #define CJF_EXT_STRLEN 3
135 
136 #define SOA_RDATA_SIZE_MAX 532
137 
138 #define DO_SYNC 1
139 
140 #define JRNLCJF_TAG 0x58494c4e524a
141 
142 /**
143  * Two steps means that the journal is written in two passes.
144  * Pass 1 gathers a full page from input and validates it.
145  * Pass 2 stores it to the journal file.
146  */
147 
148 #define CJF_USE_TWO_STEPS 1
149 
150 /*
151  * Contains the journal (almost: not the matching start and end SOA)
152  */
153 
154 #define CJF_WIRE_FILE_FORMAT "%s/%{dnsname}." CJF_EXT
155 #define FIRST_FROM_END  (CJF_EXT_STRLEN + (1 + 8 + 1 + 8))
156 #define LAST_FROM_END   (CJF_EXT_STRLEN + (1 + 8))
157 
158 /*******************************************************************************
159  *
160  *  JNL (HEADER) ---> IDXT
161  *   |                 |
162  *   +------+------+---+
163  *   |      |      |
164  *   v      v      v
165  *  PAGE -> PAGE -> PAGE
166  *   |      |      |
167  *   v      v      v
168  *  IXFRs  IXFRs  IXFRs
169  *
170  ******************************************************************************/
171 
172 /*
173  * MAGIC 'JNL' + Version 0
174  * Serial begin
175  * Serial end
176  * Begin Index Offset
177  * Table Index Offset
178  */
179 
180 
181 /**
182  * There is a need of lockable 4K pages in an MRU that points back to their user
183  * That's where the PAGE will be stored
184  * I'm not sure of what the ratio between allowed FDs and allowed PAGE pages should be.
185  */
186 
187 static shared_group_shared_mutex_t journal_shared_mtx;
188 static bool journal_initialized = FALSE;
189 
190 static file_pool_t journal_file_pool = 0;
191 
192 void
log_debug_jnl(journal_cjf * jnl,const char * text)193 log_debug_jnl(journal_cjf *jnl, const char *text)
194 {
195     log_debug4("cjf: %s,%p: %s: header SN=[%08x; %08x] F=%08x L=%08x dirty=%i empty=%i",
196                 jnl->journal_file_name, jnl->file, text,
197                 jnl->serial_begin, jnl->serial_end,
198                 jnl->first_page_offset, jnl->page_table_file_offset,
199                 journal_cjf_is_dirty(jnl),
200                 journal_cjf_isempty(jnl));
201 
202     s16 n = jnl->idxt.count;
203 
204     if(jnl->last_page.count == 0)
205     {
206         n--;
207     }
208 
209     log_debug4("cjf: %s,%p: %s: idxt %3hi/%3hi [%3hi] dirty=%i marked=%i",
210         jnl->journal_file_name, jnl->file, text,
211         jnl->idxt.count, jnl->idxt.size, jnl->idxt.first, (jnl->idxt.dirty)?1:0, (jnl->idxt.marked)?1:0);
212 
213     log_debug4("cjf: %s,%p: %s: page: SN=[%08x; %08x] count=%3u size=%3u at=%08x next=%08x ... limit=%08x",
214                jnl->journal_file_name, jnl->file, text,
215                jnl->last_page.serial_start, jnl->last_page.serial_end,
216                jnl->last_page.count,jnl->last_page.size,
217                jnl->last_page.file_offset, jnl->last_page.records_limit,
218                jnl->last_page.file_offset_limit);
219 
220     for(s16 idx = 0; idx < n; idx++)
221     {
222         journal_cjf_idxt_tbl_item *item = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
223 
224         log_debug4("cjf: %s,%p: %s: idxt[%3i] = %08x %08x", jnl->journal_file_name, jnl->file, text, idx, item->last_serial, item->file_offset);
225     }
226 
227     if(jnl->last_page.count == 0)
228     {
229         journal_cjf_idxt_tbl_item *item = &jnl->idxt.entries[(jnl->idxt.first + n) % jnl->idxt.size];
230 
231         log_debug4("cjf: %s,%p: %s: idxt[%3i] =  [empty] %08x", jnl->journal_file_name, jnl->file, text, n, item->file_offset);
232     }
233 }
234 
235 static void
journal_cjf_writelock(journal_cjf * jnl)236 journal_cjf_writelock(journal_cjf *jnl)
237 {
238 #if DEBUG
239     log_debug4("cjf: %s,%p: write lock", jnl->journal_file_name, jnl->file);
240 #endif
241     shared_group_mutex_lock(&jnl->mtx, GROUP_MUTEX_WRITE);
242 }
243 
244 static void
journal_cjf_writeunlock(journal_cjf * jnl)245 journal_cjf_writeunlock(journal_cjf *jnl)
246 {
247 #if DEBUG
248     log_debug4("cjf: %s,%p: write unlock", jnl->journal_file_name, jnl->file);
249 #endif
250     shared_group_mutex_unlock(&jnl->mtx, GROUP_MUTEX_WRITE);
251 }
252 
253 static void
journal_cjf_readlock(journal_cjf * jnl)254 journal_cjf_readlock(journal_cjf *jnl)
255 {
256 #if DEBUG
257     log_debug4("cjf: %s,%p: read lock", jnl->journal_file_name, jnl->file);
258 #endif
259     shared_group_mutex_lock(&jnl->mtx, GROUP_MUTEX_READ);
260 }
261 
262 static void
journal_cjf_readunlock(journal_cjf * jnl)263 journal_cjf_readunlock(journal_cjf *jnl)
264 {
265 #if DEBUG
266     log_debug4("cjf: %s,%p: read unlock", jnl->journal_file_name, jnl->file);
267 #endif
268     shared_group_mutex_unlock(&jnl->mtx, GROUP_MUTEX_READ);
269 }
270 
271 bool
journal_cjf_isreadlocked(journal_cjf * jnl)272 journal_cjf_isreadlocked(journal_cjf *jnl)
273 {
274     bool ret = shared_group_mutex_islocked_by(&jnl->mtx, GROUP_MUTEX_READ);
275     return ret;
276 }
277 
278 bool
journal_cjf_iswritelocked(journal_cjf * jnl)279 journal_cjf_iswritelocked(journal_cjf *jnl)
280 {
281     bool ret = shared_group_mutex_islocked_by(&jnl->mtx, GROUP_MUTEX_WRITE);
282     return ret;
283 }
284 
285 void
journal_cjf_release(journal_cjf * jnl)286 journal_cjf_release(journal_cjf *jnl)
287 {
288     journal_release((journal*)jnl);
289 }
290 
291 static journal_cjf* journal_cjf_alloc_default(const u8 *origin, const char *filename);
292 
293 static void
journal_cjf_load_idxt(journal_cjf * jnl)294 journal_cjf_load_idxt(journal_cjf *jnl)
295 {
296     if(jnl->idxt.entries != NULL)
297     {
298         return;
299     }
300 
301     journal_cjf_idxt_load(jnl);
302 
303     if(jnl->idxt.count > 0)
304     {
305         jnl->last_page.file_offset = journal_cjf_idxt_get_last_file_offset(jnl);
306         journal_cjf_page_tbl_header current_page_header;
307         journal_cjf_page_cache_read_header(jnl->file, jnl->last_page.file_offset, &current_page_header);
308         jnl->last_page.count = current_page_header.count;
309         jnl->last_page.size = current_page_header.size;
310 
311         if(jnl->last_page.file_offset < jnl->first_page_offset)
312         {
313             jnl->last_page.file_offset_limit = jnl->first_page_offset;
314         }
315         else
316         {
317             jnl->last_page.file_offset_limit = jnl->file_maximum_size;
318         }
319 
320         if(jnl->idxt.count > 1)
321         {
322             jnl->last_page.serial_start = journal_cjf_idxt_get_last_serial(jnl, jnl->idxt.count - 2);
323         }
324         else
325         {
326             jnl->last_page.serial_start = jnl->serial_begin;
327         }
328     }
329     else
330     {
331         jnl->idxt.dirty = FALSE;
332         jnl->flags |= JOURNAL_CFJ_FLAGS_DIRTY;
333 
334         journal_cjf_page_cache_flush(jnl->file);
335 
336         journal_cjf_idxt_destroy(jnl);
337 
338         jnl->serial_begin = 0;
339         jnl->serial_end = 0;
340 
341         jnl->mtx.owner = LOCK_NONE;
342         jnl->mtx.count = 0;
343         jnl->first_page_offset = CJF_HEADER_SIZE;
344         jnl->page_table_file_offset = 0;
345         jnl->last_soa_offset = 0;
346         jnl->file_maximum_size = MAX_U32;
347 
348         if(jnl->zone != NULL)
349         {
350             jnl->file_maximum_size = jnl->zone->wire_size >> 1;
351             zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
352         }
353 
354         jnl->last_page.file_offset = CJF_HEADER_SIZE;
355         jnl->last_page.count = 0;
356         jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
357         jnl->last_page.serial_start = 0;
358         jnl->last_page.serial_end = 0;
359         jnl->last_page.records_limit = jnl->last_page.file_offset + CJF_SECTION_INDEX_SIZE;
360         jnl->last_page.file_offset_limit = jnl->file_maximum_size;
361 
362 #if _BSD_SOURCE || _XOPEN_SOURCE >= 500 || _XOPEN_SOURCE && _XOPEN_SOURCE_EXTENDED || /* Since glibc 2.3.5: */ _POSIX_C_SOURCE >= 200112L
363         file_pool_resize(jnl->file, CJF_HEADER_SIZE);
364 #endif
365     }
366 }
367 
368 static int
journal_cjf_create_file(journal_cjf ** jnlp,const u8 * origin,const char * filename)369 journal_cjf_create_file(journal_cjf **jnlp, const u8 *origin, const char *filename)
370 {
371     log_debug3("cjf: %{dnsname}: creating %s", origin, filename);
372 
373     int flags = O_RDWR|O_CREAT|O_EXCL|O_CLOEXEC;
374 #ifdef O_NOATIME
375     flags |= O_NOATIME;
376 #endif
377     file_pool_file_t file;
378     ya_result ret;
379     cjf_header hdr;
380 
381     file = file_pool_open_ex(journal_file_pool, filename, flags, 0644);
382 
383     if(file != NULL)
384     {
385         journal_cjf *jnl = journal_cjf_alloc_default(origin, filename);
386 
387         hdr.magic_plus_version = CJF_CJF0_MAGIC;
388         hdr.serial_begin = 0;
389         hdr.serial_end = 0;
390         hdr.first_index_offset = 0;
391         hdr.table_index_offset = 0;
392         hdr.last_soa_offset = 0,
393         hdr.last_page_offset_next = 0;
394         //hdr.last_page_item_count = 0;
395         hdr.flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN; // not dirty
396 
397         ssize_t n = file_pool_writefully(file, &hdr, CJF_HEADER_SIZE);
398         if(n < 0)
399         {
400             ret = ERRNO_ERROR;
401             return ret;
402         }
403 
404         jnl->file = file;
405 
406         *jnlp = jnl;
407 
408         return SUCCESS;
409     }
410     else
411     {
412         ret = ERRNO_ERROR;
413         log_err("cjf: %s: failed to create %s: %r", origin, filename, ret);
414 
415         *jnlp = NULL;
416 
417         return ret;
418     }
419 }
420 
421 /**
422  *
423  * Does NOT set the fd field in jnl
424  * MUST return -1 in case of error
425  *
426  * @param jnl
427  * @param create
428  * @return the file descriptor or an error code
429  */
430 
431 static int
journal_cjf_init_from_file(journal_cjf ** jnlp,const u8 * origin,const char * filename,bool create)432 journal_cjf_init_from_file(journal_cjf **jnlp, const u8 *origin, const char *filename, bool create)
433 {
434     log_debug3("cjf: %{dnsname}: opening%s %s", origin, (create)?"/creating":"", filename);
435 
436     int flags = O_RDWR|O_CLOEXEC;
437 #ifdef O_NOATIME
438     flags |= O_NOATIME;
439 #endif
440     file_pool_file_t file;
441     ya_result ret;
442     bool bad_journal = FALSE;
443     cjf_header hdr;
444 
445     file = file_pool_open_ex(journal_file_pool, filename, flags, 0660);
446 
447     if(file == NULL)
448     {
449         ret = ERRNO_ERROR;
450         log_debug3("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
451 
452         if(create)
453         {
454             ret = journal_cjf_create_file(jnlp, origin, filename);
455         }
456 
457         return ret;
458     }
459 
460     s64 size = filesize(filename);
461     if(size < CJF_HEADER_SIZE)
462     {
463         bad_journal = TRUE;
464     }
465 
466     // look if the journal makes sense
467 
468     if(FAIL(ret = file_pool_readfully(file, &hdr, sizeof(hdr))))
469     {
470         ret = ERRNO_ERROR;
471         log_err("cjf: %{dnsname}: could not read header on %s: %r", origin, filename, ret);
472         bad_journal = TRUE;
473     }
474     else if((hdr.magic_plus_version != CJF_CJF0_MAGIC) || ((hdr.flags & JOURNAL_CFJ_FLAGS_MY_ENDIAN) == 0) )
475     {
476         if(hdr.magic_plus_version != CJF_CJF0_MAGIC)
477         {
478             log_err("cjf: %{dnsname}: wrong magic on %s", origin, filename);
479         }
480         else
481         {
482             log_err("cjf: %{dnsname}: wrong endian on %s", origin, filename);
483         }
484 
485         bad_journal = TRUE;
486     }
487     else if(hdr.first_index_offset == 0)
488     {
489         bad_journal = TRUE;
490     }
491 
492     if(!bad_journal)
493     {
494         // it does makes sense
495 
496         // note: DO NOT jnl->file = fd;
497 
498         journal_cjf *jnl = journal_cjf_alloc_default(origin, filename);
499 
500         jnl->flags = hdr.flags;
501 
502         jnl->serial_begin = hdr.serial_begin;
503         jnl->serial_end = hdr.serial_end;
504         jnl->first_page_offset = hdr.first_index_offset;
505         jnl->page_table_file_offset = hdr.table_index_offset;
506         jnl->last_soa_offset = hdr.last_soa_offset;
507 
508         jnl->last_page.serial_end = jnl->serial_end;
509         jnl->last_page.records_limit = hdr.last_page_offset_next;
510 
511         jnl->file = file;
512 
513         log_debug("cjf: %{dnsname}: journal expected to cover serials from %i to %i", jnl->origin, hdr.serial_begin, hdr.serial_end);
514         log_debug("cjf: %{dnsname}: journal table index located at %x%s", jnl->origin, hdr.table_index_offset,
515             (hdr.table_index_offset!=0)?"":", which means it has not been closed properly");
516 
517         *jnlp = jnl;
518 
519         return SUCCESS;
520     }
521     else
522     {
523         // the journal content is unexpected
524 
525         file_pool_close(file);
526         file = NULL;
527 
528         char broken_file_path[PATH_MAX];
529 
530         if(ISOK(ret = snformat(broken_file_path, sizeof(broken_file_path),"%s.bad-journal", filename)))
531         {
532             bool try_again = create;
533 
534             // remove previous bad-journal if any
535             if(unlink(broken_file_path) < 0)
536             {
537                 ret = ERRNO_ERROR;
538                 if(ret == MAKE_ERRNO_ERROR(ENOENT))
539                 {
540                     ret = SUCCESS;
541                 }
542                 else
543                 {
544                     log_err("cjf: %{dnsname}: unable to delete previous bad journal %s: %r", origin, broken_file_path, ret);
545                     try_again = FALSE;
546                 }
547             }
548 
549             // successfully handled the previous .bad-journal
550 
551             if(ISOK(ret))
552             {
553                 // rename the journal into bad-journal
554                 if(rename(filename, broken_file_path) < 0)
555                 {
556                     ret = ERRNO_ERROR;
557                     log_err("cjf: %{dnsname}: unable to rename %s into %s: %r", origin, filename, broken_file_path, ret);
558 
559                     if(unlink(filename) < 0)
560                     {
561                         ret = ERRNO_ERROR;
562                         log_err("cjf: %{dnsname}: unable to delete %s: %r", origin, filename, ret);
563                         try_again = FALSE;
564                     }
565                 }
566 
567                 ret = ZDB_ERROR_ICMTL_NOTFOUND;
568             }
569 
570             if(try_again) // we are allowed to create and got no counter-indication
571             {
572                 int ret = journal_cjf_create_file(jnlp, origin, filename); // we are in a branch where "create = TRUE"
573 
574                 return ret;
575             }
576         }
577         else
578         {
579             log_err("cjf: %{dnsname}: %s is a bad journal, please remove it.", origin, filename);
580         }
581     }
582 
583     return ret;
584 }
585 
586 void
journal_cjf_header_flush(journal_cjf * jnl)587 journal_cjf_header_flush(journal_cjf *jnl)
588 {
589     if(journal_cjf_is_dirty(jnl))
590     {
591         yassert(jnl->file != NULL);
592 
593         log_debug("cjf: %s,%p: flushing header SN=[%08x; %08x] F=%08x T=%08x", jnl->journal_file_name, jnl->file,
594                 jnl->serial_begin, jnl->serial_end, jnl->first_page_offset, jnl->page_table_file_offset);
595 
596         off_t pos;
597 
598         if((pos = file_pool_seek(jnl->file, 4, SEEK_SET)) != 4)
599         {
600             log_err("cjf: %s,%p: failed to set file position: %lli instead of %i (%r)", jnl->journal_file_name, jnl->file, pos, 4, ERRNO_ERROR);
601             logger_flush();
602             abort();
603         }
604 
605         cjf_header hdr;
606         //hdr.magic_plus_version = 0;
607         hdr.serial_begin = jnl->serial_begin;
608         hdr.serial_end = jnl->serial_end;
609         hdr.first_index_offset = jnl->first_page_offset;
610         hdr.table_index_offset = jnl->page_table_file_offset;
611         hdr.last_soa_offset = jnl->last_soa_offset;
612         hdr.last_page_offset_next = jnl->last_page.records_limit;
613         //hdr.last_page_item_count = jnl->page.count;
614         hdr.flags = jnl->flags;
615 
616         file_pool_writefully(jnl->file, &hdr.serial_begin, CJF_HEADER_SIZE - 4);
617 
618         journal_cjf_clear_dirty(jnl);
619     }
620 }
621 
622 /**
623  *
624  * Removes the first PAGE from the journal.
625  * Adjust the current PAGE limit;
626  *
627  * @param jnl
628  */
629 
630 void
journal_cjf_remove_first_page(journal_cjf * jnl)631 journal_cjf_remove_first_page(journal_cjf *jnl)
632 {
633     log_debug_jnl(jnl, "journal_cjf_remove_first_page: BEFORE");
634 
635     u32 stored_serial = jnl->serial_begin + 1; // (ensure an error would trigger a flush)
636 
637     u8 zt = 0;
638     if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
639     {
640         if(zt == ZT_MASTER)
641         {
642             zdb_zone_info_get_stored_serial(jnl->origin, &stored_serial); // for master only
643 
644             if(serial_le(stored_serial, jnl->serial_begin))
645             {
646                 log_debug("cjf: %s,%p: journal page %u will be lost, flushing zone first", jnl->journal_file_name, jnl->file, jnl->journal_file_name, jnl->serial_begin);
647                 zdb_zone_info_background_store_zone(jnl->origin);
648             }
649         }
650     }
651 
652     journal_cjf_page_tbl_header first_page_hdr;
653     journal_cjf_page_cache_read_header(jnl->file, jnl->first_page_offset,  &first_page_hdr);
654     if(first_page_hdr.next_page_offset < jnl->first_page_offset)
655     {
656         // this is the last page, of the file, physically
657         jnl->page_table_file_offset = jnl->last_page.records_limit;
658         jnl->idxt.dirty = TRUE;
659     }
660 
661     journal_cjf_page_cache_clear(jnl->file, jnl->first_page_offset);
662 
663     jnl->serial_begin = journal_cjf_idxt_get_last_serial(jnl, 0);
664     jnl->first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 1);
665 
666     ++jnl->idxt.first;
667     --jnl->idxt.count;
668 
669     journal_cjf_set_dirty(jnl);
670 
671     if(jnl->last_page.file_offset < jnl->first_page_offset)
672     {
673         jnl->last_page.file_offset_limit = jnl->first_page_offset;
674     }
675     else // at or after
676     {
677         jnl->last_page.file_offset_limit = jnl->file_maximum_size;
678     }
679 
680     log_debug_jnl(jnl, "journal_cjf_remove_first_page: AFTER");
681 
682     log_debug("cjf: %s,%p: first PAGE now at %u (%08x), journal starts with serial %u (%08x", jnl->journal_file_name, jnl->file,
683                 jnl->first_page_offset, jnl->first_page_offset, jnl->serial_begin, jnl->serial_begin);
684 }
685 
686 /*******************************************************************************
687  *
688  * Index table handling functions
689  *
690  ******************************************************************************/
691 
692 /*****************************************************************************/
693 
694 static void journal_cjf_writelock(journal_cjf *jnl);
695 static void journal_cjf_writeunlock(journal_cjf *jnl);
696 
697 static void journal_cjf_readlock(journal_cjf *jnl);
698 static void journal_cjf_readunlock(journal_cjf *jnl);
699 
700 static const char *
journal_cjf_get_format_name()701 journal_cjf_get_format_name()
702 {
703     return JOURNAL_FORMAT_NAME;
704 }
705 
706 static u32
journal_cjf_get_format_version()707 journal_cjf_get_format_version()
708 {
709     return VERSION_U32(VERSION_HI,VERSION_LO);
710 }
711 
712 static ya_result
journal_cjf_read_soa_record(dns_resource_record * rr,input_stream * ixfr_wire_is)713 journal_cjf_read_soa_record(dns_resource_record *rr, input_stream *ixfr_wire_is)
714 {
715     ya_result return_value;
716 
717     if((return_value = dns_resource_record_read(rr, ixfr_wire_is)) <= 0)
718     {
719         /* FAIL or EOF */
720         return return_value;
721     }
722 
723 #if DEBUG
724     rdata_desc rdatadesc = {rr->tctr.qtype, rr->rdata_size, rr->rdata};
725     log_debug("cjf: %{dnsname} %{typerdatadesc}", rr->name, &rdatadesc);
726 #endif
727 
728     if((rr->tctr.qtype != TYPE_SOA) || (rr->rdata_size > SOA_RDATA_SIZE_MAX))
729     {
730         log_err("cjf: expected SOA record but got %{dnstype} instead", &rr->tctr.qtype);
731 
732         return ZDB_JOURNAL_SOA_RECORD_EXPECTED;
733     }
734 
735     return return_value;
736 }
737 
738 struct journal_cjf_read_ixfr_s
739 {
740     input_stream *ixfr_wire_is;
741     output_stream baos;
742     dns_resource_record rr;
743     u32 serial_from;
744     u32 serial_to;
745     u32 size;
746     bool eof;
747 };
748 
749 typedef struct journal_cjf_read_ixfr_s journal_cjf_read_ixfr_s;
750 
751 ya_result
journal_cjf_read_ixfr_init(journal_cjf_read_ixfr_s * ixfrinc,input_stream * ixfr_wire_is)752 journal_cjf_read_ixfr_init(journal_cjf_read_ixfr_s *ixfrinc, input_stream *ixfr_wire_is)
753 {
754     ya_result ret;
755     ixfrinc->ixfr_wire_is = ixfr_wire_is;
756     bytearray_output_stream_init_ex(&ixfrinc->baos, NULL, 65536, BYTEARRAY_DYNAMIC);
757     dns_resource_record_init(&ixfrinc->rr);
758     ixfrinc->serial_from = 0;
759     ixfrinc->serial_to = 0;
760     ixfrinc->size = 0;
761     ixfrinc->eof = FALSE;
762 
763     ret = journal_cjf_read_soa_record(&ixfrinc->rr, ixfr_wire_is);
764 
765 #if DEBUG
766     if(ISOK(ret))
767     {
768         log_debug2("cjf: ---: started with %{dnsrr}", &ixfrinc->rr);
769     }
770 #endif
771 
772     return ret;
773 }
774 
775 void
journal_cjf_read_ixfr_finalize(journal_cjf_read_ixfr_s * ixfrinc)776 journal_cjf_read_ixfr_finalize(journal_cjf_read_ixfr_s *ixfrinc)
777 {
778     ixfrinc->ixfr_wire_is = NULL;
779 
780     dns_resource_record_clear(&ixfrinc->rr);
781     output_stream_close(&ixfrinc->baos);
782 
783     ixfrinc->serial_from = 0;
784     ixfrinc->serial_to = 0;
785     ixfrinc->size = 0;
786 }
787 
788 /**
789  *
790  * Reads a single page of incremental changes (-SOA ... +SOA ...)
791  *
792  * @param ixfrinc
793  * @return the size of the page (0 if there is nothing to be read), or an error code
794  */
795 
796 static ya_result
journal_cjf_read_ixfr_read(journal_cjf_read_ixfr_s * ixfrinc)797 journal_cjf_read_ixfr_read(journal_cjf_read_ixfr_s *ixfrinc)
798 {
799     if(ixfrinc->eof)
800     {
801         ixfrinc->size = 0;
802         return 0;
803     }
804 
805     input_stream *ixfr_wire_is = ixfrinc->ixfr_wire_is;
806     output_stream *baos = &ixfrinc->baos;
807     dns_resource_record *rr = &ixfrinc->rr;
808 
809     ya_result ret;
810     bool need_another_soa = TRUE;
811 
812     bytearray_output_stream_reset(baos);
813 
814     // must start by an SOA
815 
816     if(rr->tctr.qtype == TYPE_SOA)
817     {
818         ret = rr_soa_get_serial(rr->rdata, rr->rdata_size, &ixfrinc->serial_from);
819     }
820     else
821     {
822         ret = ZDB_JOURNAL_SOA_RECORD_EXPECTED;
823     }
824 
825     ixfrinc->size = 0;
826 
827     if(ISOK(ret))
828     {
829         for(int idx = 0;; ++idx)
830         {
831 #if DEBUG
832             log_debug2("cjf: ---: %4i: %{dnsrr}", idx, rr);
833 #endif
834             dns_resource_record_write(rr, baos);
835 
836             if((ret = dns_resource_record_read(rr, ixfr_wire_is)) <= 0)
837             {
838                 if(ret == 0)
839                 {
840                     if(!need_another_soa)
841                     {
842                         ixfrinc->size = bytearray_output_stream_size(baos);
843 #if DEBUG
844                         log_debug2("cjf: ===: IXFR incremental change size: %i", ixfrinc->size);
845 #endif
846                         ret = ixfrinc->size;
847                     }
848                     else
849                     {
850 #if DEBUG
851                         log_debug2("cjf: ===: still expected an SOA");
852 #endif
853                         // SOA expected
854                         ret = ZDB_JOURNAL_SOA_RECORD_EXPECTED;
855                     }
856 
857                     ixfrinc->eof = TRUE;
858                 }
859                 else
860                 {
861 #if DEBUG
862                     log_debug2("cjf: ===: failed to read the next record: %r", ret);
863 #endif
864                 }
865 
866                 break;
867             }
868 
869             if(rr->tctr.qtype == TYPE_SOA)
870             {
871                 if(need_another_soa)
872                 {
873                     if(FAIL(ret = rr_soa_get_serial(rr->rdata, rr->rdata_size, &ixfrinc->serial_to)))
874                     {
875 #if DEBUG
876                         log_debug2("cjf: ===: failed parse serial from record: %r", ret);
877 #endif
878                         break;
879                     }
880 
881                     need_another_soa = FALSE;
882                 }
883                 else
884                 {
885                     // another page starts here
886                     // this record will written for the next page
887 
888                     ixfrinc->size = bytearray_output_stream_size(baos);
889 #if DEBUG
890                     log_debug2("cjf: ===: IXFR incremental change size: %i (followed ...)", ixfrinc->size);
891 #endif
892                     ret = ixfrinc->size;
893                     break;
894                 }
895             }
896         }
897     }
898 
899     return ret;
900 }
901 
902 /**
903  * The caller will take action that will end up removing the first page.
904  * Either explicitly, either overwriting it (ie: looping).
905  *
906  * This function ensures that it's OK to do so.
907  *
908  * Returns:
909  *
910  * 0 if it's OK to do so, and no actions were taken,
911  * 1 if it's OK to do so, but the zone needed to be stored
912  * or an error code.
913  *
914  * @param jnl
915  *
916  * @return the state of the operation
917  *
918  */
919 
920 static ya_result
journal_cjf_append_ixfr_stream_first_page_removal(journal_cjf * jnl)921 journal_cjf_append_ixfr_stream_first_page_removal(journal_cjf *jnl)
922 {
923     // the caller will remove first page to make room, prepare for it
924 
925     ya_result ret;
926     u32 zone_stored_serial;
927 
928     // get the serial of the stored zone
929 
930     if(FAIL(ret = zdb_zone_info_get_stored_serial(jnl->origin, &zone_stored_serial)))
931     {
932         log_warn("cjf: %{dnsname}: could not get the serial of the stored zone: %r", jnl->origin, ret);
933         return ret;
934     }
935 
936     u8 zt = 0;
937     if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
938     {
939         if(zt == ZT_SLAVE)
940         {
941             u32 ts = jnl->zone->axfr_timestamp;
942             u32 sr = jnl->zone->axfr_serial;
943             if(ts > 1)
944             {
945                 if(serial_gt(sr, zone_stored_serial))
946                 {
947                     zone_stored_serial = sr;
948                 }
949             }
950         }
951     }
952 
953     // get the page of the serial
954 
955     if(FAIL(ret = journal_cjf_idxt_get_page_offset_from_serial(jnl, zone_stored_serial, NULL)))
956     {
957         if(serial_le(jnl->serial_end, zone_stored_serial))
958         {
959             log_debug("cjf: %{dnsname}: no need to store the zone again as it's already %i steps further", jnl->origin, zone_stored_serial - jnl->serial_end);
960             return 0;
961         }
962         else
963         {
964             log_warn("cjf: %{dnsname}: could not get page of serial %u: %r", jnl->origin, zone_stored_serial, ret);
965             return ret;
966         }
967     }
968 
969     // ret is the index of the page, if it is 0 we may need to save the current zone
970 
971     bool need_to_store_before_removing_first_page = (ret == 0);
972 
973     log_debug("cjf: %{dnsname}: zone currently stored up to serial %i, located on page %i of the journal", jnl->origin, zone_stored_serial, ret);
974 
975     if(need_to_store_before_removing_first_page)
976     {
977         // we are about to destroy the page of the currently stored serial AND
978         // there are steps remaining to be safe
979 
980         log_warn("cjf: %{dnsname}: need to store the zone right now, consider increasing the journal size", jnl->origin);
981 
982         zdb_zone *zone = (zdb_zone*)jnl->zone;
983 
984         // the zone at this point is supposed to be locked
985         // either simply with a simple reader
986         // either doubly with something and a simple reader
987         // if simply : do nothing
988         // if doubly : with the simple reader : exchange them
989         //             with anything else : it cannot ever work
990 
991         u8 owner = zone->lock_owner;
992         u8 reserved_owner = zone->lock_reserved_owner;
993         if(owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
994         {
995             zdb_zone_lock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
996             ret = zdb_zone_info_store_locked_zone(jnl->origin);
997             zdb_zone_unlock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
998         }
999         else if(owner == ZDB_ZONE_MUTEX_NOBODY)
1000         {
1001             zdb_zone_lock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1002             ret = zdb_zone_info_store_locked_zone(jnl->origin);
1003             zdb_zone_unlock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1004         }
1005         else if(reserved_owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
1006         {
1007             zdb_zone_exchange_locks(zone, owner, ZDB_ZONE_MUTEX_SIMPLEREADER);
1008             ret = zdb_zone_info_store_locked_zone(jnl->origin);
1009             zdb_zone_exchange_locks(zone, ZDB_ZONE_MUTEX_SIMPLEREADER, owner);
1010         }
1011         else
1012         {
1013             ret = ERROR; // obsolete
1014         }
1015 
1016         if(FAIL(ret))
1017         {
1018             log_warn("cjf: %{dnsname}: cannot store the zone: %r", jnl->origin, ret);
1019             return ret;
1020         }
1021 
1022         return 1; // try again
1023     }
1024     else
1025     {
1026         return 0; // continue
1027     }
1028 }
1029 
journal_cjf_get_space_left_until_need_storage_page(journal_cjf * jnl)1030 static s64 journal_cjf_get_space_left_until_need_storage_page(journal_cjf *jnl)
1031 {
1032     ya_result ret;
1033     u32 zone_stored_serial;
1034 
1035     if(FAIL(ret = zdb_zone_info_get_stored_serial(jnl->origin, &zone_stored_serial)))
1036     {
1037         log_warn("cjf: %{dnsname}: could not get teh serial of the stored zone: %r", jnl->origin, ret);
1038 
1039         // save asap
1040 
1041         return 0;
1042     }
1043 
1044     u8 zt = 0;
1045     if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
1046     {
1047         if(zt == ZT_SLAVE)
1048         {
1049             u32 ts = jnl->zone->axfr_timestamp;
1050             u32 sr = jnl->zone->axfr_serial;
1051             if(ts > 1)
1052             {
1053                 if(serial_gt(sr, zone_stored_serial))
1054                 {
1055                     zone_stored_serial = sr;
1056                 }
1057             }
1058         }
1059     }
1060 
1061     // get the page of the serial
1062 
1063     if(FAIL(ret = journal_cjf_idxt_get_page_offset_from_serial(jnl, zone_stored_serial, NULL)))
1064     {
1065         if(serial_le(jnl->serial_end, zone_stored_serial))
1066         {
1067             log_debug("cjf: %{dnsname}: no need to store the zone again as it's already %i steps further", jnl->origin, zone_stored_serial - jnl->serial_end);
1068 
1069             // the journal is only there for the slaves, it could be completely replaced
1070 
1071             return jnl->file_maximum_size;
1072         }
1073         else
1074         {
1075             log_warn("cjf: %{dnsname}: could not get page of serial %u: %r", jnl->origin, zone_stored_serial, ret);
1076 
1077             // save asap
1078 
1079             return 0;
1080         }
1081     }
1082 
1083     const journal_cjf_idxt_tbl_item* need_storage = journal_cjf_idxt_get_entry(jnl, ret);
1084 
1085     if(jnl->last_page.file_offset == need_storage->file_offset)
1086     {
1087         // we are on the page : we basically have the size of our page minus the file size
1088 
1089         return MAX((s32)(jnl->file_maximum_size - (jnl->last_page.records_limit - jnl->last_page.file_offset)), 0);
1090     }
1091     else if(jnl->last_page.file_offset < need_storage->file_offset)
1092     {
1093         // we have everything until that page
1094 
1095         return MAX((s32)(need_storage->file_offset - jnl->last_page.records_limit), 0);
1096     }
1097     else // if(jnl->last_page.file_offset > need_storage->file_offset)
1098     {
1099         // we have the remaining space until the end of the file plus the offset of the page (minus the header)
1100 
1101         return MAX((s32)(jnl->file_maximum_size - jnl->last_page.records_limit), 0) + need_storage->file_offset;
1102     }
1103 }
1104 
1105 static ya_result
journal_cjf_append_ixfr_stream_per_page(journal * jh,input_stream * ixfr_wire_is,bool is_slave)1106 journal_cjf_append_ixfr_stream_per_page(journal *jh, input_stream *ixfr_wire_is, bool is_slave)
1107 {
1108     journal_cjf *jnl = (journal_cjf*)jh;
1109     ya_result ret;
1110 
1111     log_debug("cjf: %s,%p: append IXFR (master)", jnl->journal_file_name, jnl->file);
1112 
1113     // ensure the zone locks are usable : locked by the reader, by nobody, or the reader is a reserved owner
1114     if(jnl->zone != NULL)
1115     {
1116         zdb_zone *zone = (zdb_zone*)jnl->zone;
1117         u8 owner = zone->lock_owner;
1118         u8 reserved_owner = zone->lock_reserved_owner;
1119         if(
1120                 !(
1121                 (owner == ZDB_ZONE_MUTEX_SIMPLEREADER) ||
1122                 (owner == ZDB_ZONE_MUTEX_NOBODY) ||
1123                 (reserved_owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
1124                 )
1125             )
1126         {
1127             log_err("cjf: %s,%p: append IXFR (master) cannot happen because the zone locks are not set properly", jnl->journal_file_name, jnl->file);
1128             return ERROR;
1129         }
1130     }
1131 
1132     int written_pages = 0;
1133 
1134     dns_resource_record rr;
1135     dns_resource_record_init(&rr);
1136 
1137     output_stream os;
1138     output_stream_set_void(&os); // very important
1139 
1140     journal_cjf_read_ixfr_s ixfrinc;
1141     journal_cjf_read_ixfr_init(&ixfrinc, ixfr_wire_is);
1142 
1143     journal_cjf_writelock(jnl);
1144 
1145     for(;;)
1146     {
1147         ret = journal_cjf_read_ixfr_read(&ixfrinc);
1148 
1149         if(ret <= 0)
1150         {
1151             journal_cjf_page_output_stream_cancel(&os);
1152 
1153             if(ret == 0)
1154             {
1155                 log_info("cjf: %{dnsname}: no incremental changes remaining", jnl->origin);
1156             }
1157             else
1158             {
1159                 log_err("cjf: %{dnsname}: failed to read changes: %r", jnl->origin, ret);
1160             }
1161             break;
1162         }
1163 
1164         // else records have been read
1165 
1166         yassert((ixfrinc.serial_from == ixfrinc.serial_to) || (ixfrinc.size != 0));
1167 
1168         log_info("cjf: %{dnsname}: incremental changes read from %u to %u (%u bytes)", jnl->origin, ixfrinc.serial_from, ixfrinc.serial_to, ixfrinc.size);
1169 
1170         bool journal_is_empty = journal_cjf_isempty(jnl);
1171 
1172         if(!journal_is_empty)
1173         {
1174             // if the journal is not empty, ensure the page follows the last journal page
1175 
1176             if(serial_lt(ixfrinc.serial_from, jnl->serial_end))
1177             {
1178                 log_info("cjf: %{dnsname}: ignoring changes before serial %u", jnl->origin, jnl->serial_end);
1179 
1180                 continue;   // read next page
1181             }
1182             else if(serial_gt(ixfrinc.serial_from, jnl->serial_end))
1183             {
1184                 log_warn("cjf: %{dnsname}: missing changes between serials %u and %u", jnl->origin, jnl->serial_end, ixfrinc.serial_from);
1185 
1186                 break;      // full stop
1187             }
1188         }
1189         else
1190         {
1191             // create a journal with one entry
1192 
1193             journal_cjf_idxt_create(jnl, 1);
1194         }
1195 
1196         /***/
1197 
1198         // reserve the known size of this single page with the serial range
1199         // write the page
1200 
1201         /***/
1202 
1203 journal_cjf_append_ixfr_stream_master_accum_tryagain:
1204 
1205         journal_cjf_page_output_stream_reopen(&os, jnl);
1206 
1207         // the file is cycling, so we also need to see if we are writing at
1208         // a position before the first page's and thus risking to overwrite it
1209 
1210         s64 available = 0;
1211 
1212         while(journal_cjf_page_current_output_stream_may_overwrite(jnl))
1213         {
1214             // if total available is smaller than half the file, the division will be >= 2
1215 
1216             s64 total_available = journal_cjf_get_space_left_until_need_storage_page(jnl);
1217 
1218             // should not store in background. Handle it first-hand (maybe postpone the update) ... (obsolete)
1219 
1220             if( ((total_available > 0) && ((jnl->file_maximum_size / total_available) >= 2)) || (total_available == 0))
1221             {
1222                 // if not writing already, then write
1223                 // 0 = not saving, 1 = saving, <0 = error
1224                 if(zdb_zone_info_background_store_in_progress(jnl->origin) != 1)
1225                 {
1226                     zdb_zone_info_background_store_zone(jnl->origin);
1227                 }
1228             }
1229 
1230             available = jnl->first_page_offset - jnl->last_page.records_limit;
1231 
1232             if(available >= ixfrinc.size)
1233             {
1234                 log_debug("cjf: %{dnsname}: %lli >= %i storage available before overwriting the first page", jnl->origin, available, ixfrinc.size);
1235                 break;
1236             }
1237 
1238             if(FAIL(ret = journal_cjf_append_ixfr_stream_first_page_removal(jnl)))
1239             {
1240                 break;
1241             }
1242 
1243             if(ret == 1)
1244             {
1245                 continue;
1246             }
1247 
1248             journal_cjf_remove_first_page(jnl);
1249         } // while may overwrite
1250 
1251         if(FAIL(ret))
1252         {
1253             journal_cjf_page_output_stream_cancel(&os);
1254             break;
1255         }
1256 
1257         // available space between the first available byte to write records and
1258         // the file size limit
1259 
1260         if(!journal_cjf_page_current_output_stream_may_overwrite(jnl))
1261         {
1262             s64 total_available = journal_cjf_get_space_left_until_need_storage_page(jnl);
1263 
1264             if( ((total_available > 0) && ((jnl->file_maximum_size / total_available) >= 2)) || (total_available == 0))
1265             {
1266                 // if not writing already, then write
1267                 // 0 = not saving, 1 = saving, <0 = error
1268                 if(zdb_zone_info_background_store_in_progress(jnl->origin) != 1)
1269                 {
1270                     zdb_zone_info_background_store_zone(jnl->origin);
1271                 }
1272             }
1273 
1274             // space available until the current limit of the page (file size or first page offset)
1275             available = journal_cjf_get_last_page_available_space_left(jnl);
1276 
1277             if(available >= ixfrinc.size)
1278             {
1279                 log_debug("cjf: %{dnsname}: %lli >= %i storage available in this page", jnl->origin, available, ixfrinc.size);
1280             }
1281             else
1282             {
1283                 // not enough room, but can we handle it ?
1284                 //
1285                 // if there is only one NON-EMPTY page in the journal,
1286                 //    cut it and create a new one
1287                 // if there are at least two NON-EMPTY pages and the available space between them is big enough,
1288                 //    cut the last page
1289                 //    remove the first
1290                 // else just complain about the journal size and continue
1291 
1292                 int page_count = journal_cjf_idxt_get_page_count(jnl);
1293                 bool last_page_empty = journal_cjf_page_line_count(jnl) == 0;
1294 
1295                 yassert(page_count > 0);
1296 
1297                 if(page_count == 1)
1298                 {
1299                     if(!last_page_empty)
1300                     {
1301                         log_debug("cjf: %{dnsname}: one page: append another page", jnl->origin);
1302 
1303                         // cut and proceed, as we will probably want to roll on the next update
1304 
1305                         journal_cjf_page_output_stream_cancel(&os);
1306                         journal_cjf_idxt_append_page(jnl);
1307                         output_stream_close(&os);
1308                         output_stream_set_void(&os); // very important
1309 
1310                         //journal_cjf_page_output_stream_reopen(&os, jnl);
1311 
1312                         // and loop
1313 
1314                         ret = SUCCESS;
1315 
1316                         goto journal_cjf_append_ixfr_stream_master_accum_tryagain;
1317                     }
1318                     else
1319                     {
1320                         log_debug("cjf: %{dnsname}: one empty page: write on it", jnl->origin);
1321                         // just proceed
1322                     }
1323                 }
1324                 else
1325                 {
1326                     if(!last_page_empty)
1327                     {
1328                         u32 available_from_beginning = jnl->last_page.file_offset - CJF_PAGE_SIZE_IN_BYTE - CJF_HEADER_SIZE;
1329 
1330                         if(ixfrinc.size <= available_from_beginning)
1331                         {
1332                             // for a slave only, ensure the journal has been read before going further.
1333 
1334                             if(is_slave)
1335                             {
1336                                 // page_count > 1
1337 
1338                                 u32 page_1_serial_from = journal_cjf_idxt_get_page_serial_from_index(jnl, 1);
1339                                 u32 starting_zone_serial = page_1_serial_from - 1;
1340                                 zdb_zone_lock((zdb_zone*)jnl->zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1341                                 /*ret = */zdb_zone_getserial((zdb_zone*)jnl->zone, &starting_zone_serial); // zone is locked
1342                                 zdb_zone_unlock((zdb_zone*)jnl->zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1343                                 // compare with start serial of second page
1344                                 if(serial_lt(starting_zone_serial, page_1_serial_from))
1345                                 {
1346                                     // cutting the first page will break continuity :stop reading IXFR now, apply the journal and try later.
1347                                     journal_cjf_page_output_stream_cancel(&os);
1348                                     goto journal_cjf_append_ixfr_stream_master_accum_exit;
1349                                 }
1350                             }
1351 
1352                             log_debug("cjf: %{dnsname}: %u of %u bytes available on a loop: loop", jnl->origin, available_from_beginning, ixfrinc.size);
1353 
1354                             // cutting now will allow to loop
1355 
1356                             if(FAIL(ret = journal_cjf_append_ixfr_stream_first_page_removal(jnl)))
1357                             {
1358                                 // could not remove first page
1359                                 break;
1360                             }
1361 
1362                             journal_cjf_page_output_stream_cancel(&os);
1363                             u32 tmp = jnl->file_maximum_size;
1364                             jnl->file_maximum_size = 0; // force the loop (thus removing the first page)
1365                             journal_cjf_idxt_append_page(jnl);
1366                             jnl->file_maximum_size = tmp;
1367                             output_stream_close(&os);
1368                             output_stream_set_void(&os); // very important
1369                             journal_cjf_page_output_stream_reopen(&os, jnl);
1370                         }
1371                         else
1372                         {
1373                             log_debug("cjf: %{dnsname}: %u/%u bytes available on a loop: continue", jnl->origin, available_from_beginning, ixfrinc.size);
1374 
1375                             // do a cut to allow to loop soon
1376 
1377                             journal_cjf_page_output_stream_cancel(&os);
1378                             u32 tmp = jnl->file_maximum_size;
1379                             jnl->file_maximum_size = MAX_U32;
1380                             journal_cjf_idxt_append_page(jnl);
1381                             jnl->file_maximum_size = tmp;
1382                             output_stream_close(&os);
1383                             output_stream_set_void(&os); // very important
1384                             journal_cjf_page_output_stream_reopen(&os, jnl);
1385                         }
1386                     }
1387                     else
1388                     {
1389                         // just proceed
1390                         log_debug("cjf: %{dnsname}: last page is empty: write on it", jnl->origin);
1391                     }
1392                 }
1393             }
1394         }
1395 
1396         // write the ixfr increment in the page
1397 
1398         input_stream bais;
1399         bytearray_input_stream_init_const(&bais, bytearray_output_stream_buffer(&ixfrinc.baos), bytearray_output_stream_size(&ixfrinc.baos));
1400         for(;;)
1401         {
1402             if((ret = dns_resource_record_read(&rr, &bais)) <= 0)
1403             {
1404                 if(ret != 0)
1405                 {
1406                     log_err("cjf: %{dnsname}: error re-reading record: %r", jnl->origin, ret);
1407                 }
1408                 break;
1409             }
1410 
1411 #if DEBUG
1412             log_debug("cjf: %{dnsname}: writing %{dnsrr}", jnl->origin, &rr);
1413 #endif
1414 
1415             if(FAIL(ret = journal_cfj_page_output_stream_write_resource_record(&os, &rr)))
1416             {
1417                 log_err("cjf: %{dnsname}: could not store record: %r", jnl->origin, ret);
1418                 break;
1419             }
1420         }
1421         input_stream_close(&bais);
1422 
1423         if(ISOK(ret))
1424         {
1425             if(journal_is_empty)
1426             {
1427                 jnl->serial_begin = ixfrinc.serial_from;
1428                 journal_cjf_clear_empty(jnl);
1429             }
1430 
1431             jnl->serial_end = ixfrinc.serial_to;
1432             journal_cjf_set_dirty(jnl);
1433 
1434             ++written_pages;
1435             journal_cjf_page_output_stream_next(&os);
1436 
1437             // if we wrote records after the expected position for the IDXT, move the IDXT position
1438 
1439             if(jnl->last_page.records_limit > jnl->page_table_file_offset)
1440             {
1441                 jnl->page_table_file_offset = jnl->last_page.records_limit;
1442                 jnl->idxt.dirty = TRUE;
1443                 journal_cjf_set_dirty(jnl);
1444             }
1445         }
1446         else
1447         {
1448             journal_cjf_page_output_stream_cancel(&os);
1449             break;
1450         }
1451     }
1452 
1453 journal_cjf_append_ixfr_stream_master_accum_exit:
1454     if(os.data != NULL)
1455     {
1456         output_stream_close(&os);
1457     }
1458 
1459     journal_cjf_read_ixfr_finalize(&ixfrinc);
1460     dns_resource_record_clear(&rr);
1461 
1462     if(written_pages > 0)
1463     {
1464         journal_cjf_page_cache_flush(jnl->file);
1465         journal_cjf_header_flush(jnl);
1466     }
1467 
1468     journal_cjf_writeunlock(jnl);
1469 
1470     if(ISOK(ret))
1471     {
1472         log_info("cjf: %{dnsname}: added %i incremental changes to the journal", jnl->origin, written_pages);
1473         ret = TYPE_IXFR;
1474     }
1475     else
1476     {
1477         log_err("cjf: %{dnsname}: append IXFR (master) failed with: %r", jnl->origin, ret);
1478     }
1479 
1480     return ret;
1481 }
1482 
1483 static ya_result
journal_cjf_append_ixfr_stream(journal * jh,input_stream * ixfr_wire_is)1484 journal_cjf_append_ixfr_stream(journal *jh, input_stream *ixfr_wire_is)
1485 {
1486     u8 zt;
1487     journal_cjf *jnl = (journal_cjf*)jh;
1488     ya_result ret = zdb_zone_info_get_zone_type(jnl->origin, &zt);
1489     if(ISOK(ret))
1490     {
1491         switch(zt)
1492         {
1493             case ZT_MASTER:
1494                 ret = journal_cjf_append_ixfr_stream_per_page(jh, ixfr_wire_is, FALSE);
1495                 break;
1496             case ZT_SLAVE:
1497                 ret = journal_cjf_append_ixfr_stream_per_page(jh, ixfr_wire_is, TRUE);
1498                 break;
1499             default:
1500                 ret = ERROR; // obsolete
1501                 break;
1502         }
1503     }
1504     return ret;
1505 }
1506 
1507 /******************************************************************************
1508  *
1509  * Journal Input Stream
1510  * This one returns and IXFR stream
1511  *
1512  ******************************************************************************/
1513 
1514 #define JCJFISDT_TAG 0x54445349464a434a
1515 
1516 struct journal_cjf_input_stream_data
1517 {
1518     journal_cjf *jnl;
1519 
1520     file_pool_file_t file;
1521     u32 available;
1522 
1523     u32 serial_from;
1524     u32 page_next;      // DEBUG
1525 
1526     u16 idxt_index;
1527     u16 idxt_size;
1528 
1529     u16 todo_soa_record_size;
1530     bool first_stream;
1531 
1532     u8* todo_soa_record;
1533 };
1534 
1535 typedef struct journal_cjf_input_stream_data journal_cjf_input_stream_data;
1536 
1537 static ya_result
journal_cjf_input_stream_read(input_stream * stream,void * buffer_,u32 len)1538 journal_cjf_input_stream_read(input_stream* stream, void *buffer_, u32 len)
1539 {
1540     journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)stream->data;
1541     u8 *buffer = (u8*)buffer_;
1542     const u8 *base = buffer;
1543     const u8 *limit = &buffer[len];
1544     intptr n;
1545     ya_result ret = 0;
1546 
1547     journal_cjf *jnl = data->jnl;
1548 
1549     log_debug("cjf: %s,%p: input: reading %u/%u bytes, pos is %lli", jnl->journal_file_name, jnl->file,
1550             len, data->available, file_pool_seek(data->file, 0, SEEK_CUR));
1551 
1552     // while there is still room in the output buffer
1553 
1554     while((n = limit - buffer) > 0)
1555     {
1556         // if there is no data ready on input, fetch some more
1557 
1558         if(data->available == 0)
1559         {
1560             // get the next one
1561 
1562             if(data->idxt_index == data->idxt_size)
1563             {
1564                 // EOF : we were at the last index in the IDXT
1565                 break;
1566             }
1567 
1568             // get the offset of the current PAGE table
1569 
1570             u32 page_offset = journal_cjf_idxt_get_file_offset(data->jnl, data->idxt_index);
1571             u32 stream_offset;
1572             u32 stream_limit_offset;
1573 
1574             // look for the first SOA requested
1575 
1576             if(!data->first_stream)
1577             {
1578                 // we are already streaming, the XFR stream starts at the end of the PAGE (4096 bytes until next version of the journal)
1579 
1580                 stream_offset = page_offset + CJF_PAGE_SIZE_IN_BYTE;
1581             }
1582             else
1583             {
1584                 // the starting stream offset is obtained through a bit more work
1585 
1586                 if(FAIL(ret = journal_cjf_page_get_stream_offset_from_serial(data->jnl, data->idxt_index, data->serial_from, &stream_offset)))
1587                 {
1588                     return ret;
1589                 }
1590 
1591                 data->first_stream = FALSE;
1592             }
1593 
1594             journal_cjf_page_tbl_header page_header;
1595             journal_cjf_page_cache_read_header(data->jnl->file, page_offset, &page_header);
1596 
1597             if(page_header.count == 0)
1598             {
1599                 // empty page, proably not flushed
1600                 break;
1601             }
1602 
1603             stream_limit_offset = page_header.stream_end_offset;
1604 
1605             // we know where to start ...
1606 
1607             data->idxt_index++;
1608 
1609             (void)stream_limit_offset;
1610 
1611 #if DEBUG
1612             if(stream_limit_offset == 0)
1613             {
1614                 log_err("impossible limit value read from the journal");
1615                 journal_cjf_page_cache_read_header(data->jnl->file, page_offset, &page_header);
1616             }
1617 #endif
1618 
1619             yassert(stream_limit_offset != 0);
1620             yassert(stream_limit_offset > page_offset);
1621 
1622             data->available = page_header.stream_end_offset - stream_offset;
1623             data->page_next = page_header.next_page_offset;
1624 
1625             if(file_pool_seek(data->file, stream_offset, SEEK_SET) < 0)
1626             {
1627                 return ERRNO_ERROR;
1628             }
1629         }
1630 
1631         n = MIN(n, data->available);
1632 
1633         if(FAIL(ret = file_pool_readfully(data->file, buffer, n)))
1634         {
1635             return ret;
1636         }
1637 
1638         data->available -= n;
1639         buffer += n;
1640     }
1641 
1642     return buffer - base;
1643 }
1644 
1645 static ya_result
journal_cjf_input_stream_skip(input_stream * is,u32 len)1646 journal_cjf_input_stream_skip(input_stream* is, u32 len)
1647 {
1648     u8 tmp[512];
1649 
1650     journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)is->data;
1651     journal_cjf *jnl = data->jnl;
1652     log_debug("cjf: %s,%p: input: skipping %u bytes", jnl->journal_file_name, jnl->file, len);
1653 
1654     while(len > 0)
1655     {
1656         ya_result ret;
1657         u32 n = MIN(len, sizeof(tmp));
1658         if(FAIL(ret = journal_cjf_input_stream_read(is, tmp, n)))
1659         {
1660             return ret;
1661         }
1662 
1663         len -= n;
1664     }
1665 
1666     return len;
1667 }
1668 
1669 static void
journal_cjf_input_stream_close(input_stream * is)1670 journal_cjf_input_stream_close(input_stream* is)
1671 {
1672     journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)is->data;
1673 
1674     log_debug("cjf: %s,%p: input: close (%p)", data->jnl->journal_file_name, data->jnl->file, data->file);
1675     journal_cjf_readunlock(data->jnl);
1676     journal_cjf_release(data->jnl);
1677     file_pool_close(data->file);
1678     ZFREE_OBJECT(data);
1679 
1680     input_stream_set_void(is);
1681 }
1682 
1683 static const input_stream_vtbl journal_cjf_input_stream_vtbl =
1684 {
1685     journal_cjf_input_stream_read,
1686     journal_cjf_input_stream_skip,
1687     journal_cjf_input_stream_close,
1688     "journal_cjf_input_stream"
1689 };
1690 
1691 /*
1692  * the last_soa_rr is used for IXFR transfers (it has to be a prefix & suffix to the returned stream)
1693  */
1694 
1695 static ya_result
journal_cjf_get_ixfr_stream_at_serial(journal * jh,u32 serial_from,input_stream * out_input_stream,dns_resource_record * out_last_soa_rr)1696 journal_cjf_get_ixfr_stream_at_serial(journal *jh, u32 serial_from, input_stream *out_input_stream, dns_resource_record *out_last_soa_rr)
1697 {
1698     journal_cjf *jnl = (journal_cjf*)jh;
1699 
1700     log_debug("cjf: %s,%p: get IXFR stream at serial %i", jnl->journal_file_name, jnl->file, serial_from);
1701 
1702     journal_cjf_readlock(jnl);
1703 
1704     if(serial_lt(serial_from, jnl->serial_begin) || serial_ge(serial_from, jnl->serial_end))
1705     {
1706         if(serial_from == jnl->serial_end)
1707         {
1708             log_debug("cjf: %s,%p: the journal ends at %i, returning empty stream", jnl->journal_file_name, jnl->file, serial_from);
1709             journal_cjf_readunlock(jnl);
1710             empty_input_stream_init(out_input_stream);
1711             return SUCCESS; // 0
1712         }
1713         else
1714         {
1715             log_debug("cjf: %s,%p: the journal ends at %i, returning empty stream", jnl->journal_file_name, jnl->file, serial_from);
1716             journal_cjf_readunlock(jnl);
1717 #if DEBUG
1718             logger_flush();
1719 #endif
1720             return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1721         }
1722     }
1723 
1724     ya_result ret;
1725     dns_resource_record rr;
1726 
1727     dns_resource_record_init(&rr);
1728 
1729     // increment the reference count of the journal
1730     // lock the range in the file so it cannot be overwritten
1731     // create a stream that know where to start, where to end
1732     // it has to first send the last SOA
1733     // then to send the start
1734     // then to send the last SOA again
1735 
1736     if(FAIL(ret = journal_cjf_idxt_get_page_index_from_serial(jnl, serial_from)))
1737     {
1738         journal_cjf_readunlock(jnl);
1739 
1740         return ret;
1741     }
1742 
1743     yassert(ret < MAX_U16);
1744 
1745     u16 idxt_index = (u16)ret;
1746 
1747     journal_cjf_input_stream_data *data;
1748     ZALLOC_OBJECT_OR_DIE(data, journal_cjf_input_stream_data, JCJFISDT_TAG);
1749     journal_acquire((journal*)jnl);
1750     data->jnl = jnl;
1751 
1752     data->file = file_pool_open_ex(journal_file_pool, jnl->journal_file_name, O_RDONLY|O_CLOEXEC, 0660);
1753 
1754     if(data->file == NULL)
1755     {
1756         // the journal doess not exist (anymore ?)
1757         ZFREE_OBJECT(data);
1758 
1759         journal_cjf_readunlock(jnl);
1760         journal_cjf_release(jnl);
1761 
1762         return MAKE_ERRNO_ERROR(ENOENT);
1763     }
1764 
1765     data->serial_from = serial_from;
1766 
1767     if(out_last_soa_rr != NULL)
1768     {
1769         yassert(jnl->last_soa_offset != 0);
1770         // read the last SOA
1771 
1772         size_t from = ~0;
1773 
1774         file_pool_tell(data->file, &from);
1775 
1776         file_pool_seek(data->file, jnl->last_soa_offset, SEEK_SET);
1777 
1778         input_stream tmp;
1779         file_pool_file_input_stream_init(&tmp, data->file);
1780         ret = dns_resource_record_read(out_last_soa_rr, &tmp);
1781         file_pool_file_input_stream_detach(&tmp);
1782 
1783         file_pool_seek(data->file, from, SEEK_SET);
1784 
1785         if(FAIL(ret))
1786         {
1787             journal_cjf_readunlock(jnl);
1788             journal_cjf_release(jnl);
1789 
1790             log_err("cjf: %s,%p: unable to read the SOA for serial %u at position %u: %r", jnl->journal_file_name, jnl->file, serial_from, jnl->last_soa_offset, ret);
1791             ZFREE_OBJECT(data);
1792             return ret;
1793         }
1794     }
1795 
1796     data->idxt_index = idxt_index;
1797     data->idxt_size = jnl->idxt.count;
1798     data->available = 0;
1799 
1800     data->first_stream = TRUE;
1801 
1802     out_input_stream->data = data;
1803     out_input_stream->vtbl = &journal_cjf_input_stream_vtbl;
1804 
1805     return ret;
1806 
1807     /*
1808      * In page_begin.file_offset, we get the first PAGE table
1809      *
1810      * That table may chain to a next PAGE, and so on and so forth
1811      * While this is happening, every stream between offsets:
1812      *
1813      * page_begin.file_offset + CJF_SECTION_INDEX_SIZE
1814      *
1815      * and
1816      *
1817      * @(page_begin.file_offset + 4)
1818      *
1819      * is to be sent by the stream
1820      *
1821      * When @(page_begin.file_offset + 4) is 0, it is the last PAGE
1822      *
1823      * Note that @(page_begin.file_offset + 4) is cached in the IDXT entries
1824      *
1825      * Every PAGE table but the last one has exactly CJF_SECTION_INDEX_SLOT_COUNT items
1826      *
1827      * jnl->page.count contains the count of items of the current (not full) PAGE
1828      * jnl->page.offset_next contains the supremum of input after the last PAGE
1829      *
1830      * This means a journal has to be fully initialised before being read (it was not the case for an IX file)
1831      *
1832      * The content of the PAGE itself is not required.  Only the DNS part matters.
1833      *
1834      * All this also means a journal has to be flushed for its DNS on disk (since the file has to be opened separately because a cloned fd shares the file pointer)
1835      *
1836      * A range-locking mechanism is clearly needed. It should only be capable of locking up to two ranges (covers all cases).
1837      *
1838      * So here, in summary, return a stream that is linked to the journal
1839      *
1840      * It will start at offset:
1841      *
1842      * idxt[ret].file_offset + CJF_SECTION_INDEX_SIZE
1843      *
1844      * until:
1845      *
1846      * idxt[ret + 1].file_offset or page.offset_next
1847      *
1848      * and continue that way for every ret < idxt.count
1849      *
1850      */
1851 }
1852 
1853 static ya_result
journal_cjf_get_first_serial(journal * jh,u32 * serial)1854 journal_cjf_get_first_serial(journal *jh, u32 *serial)
1855 {
1856     ya_result ret = BUFFER_WOULD_OVERFLOW;
1857     journal_cjf *jnl = (journal_cjf*)jh;
1858 
1859     journal_cjf_readlock(jnl);
1860 
1861     u32 value = jnl->serial_begin;
1862 
1863     if(serial != NULL)
1864     {
1865         *serial = value;
1866         ret = SUCCESS;
1867     }
1868 
1869     journal_cjf_readunlock(jnl);
1870 
1871     log_debug("cjf: %s,%p: get first serial: %i", jnl->journal_file_name, jnl->file, value);
1872 
1873     return ret;
1874 }
1875 
1876 static ya_result
journal_cjf_get_last_serial(journal * jh,u32 * serial)1877 journal_cjf_get_last_serial(journal *jh, u32 *serial)
1878 {
1879     ya_result ret = BUFFER_WOULD_OVERFLOW;
1880     journal_cjf *jnl = (journal_cjf*)jh;
1881 
1882     journal_cjf_readlock(jnl);
1883 
1884     u32 value = jnl->serial_end;
1885 
1886     if(serial != NULL)
1887     {
1888         *serial = value;
1889         ret = SUCCESS;
1890     }
1891 
1892     log_debug("cjf: %s,%p: get last serial: %i", jnl->journal_file_name, jnl->file, value);
1893 
1894     journal_cjf_readunlock(jnl);
1895 
1896     return ret;
1897 }
1898 
1899 static ya_result
journal_cjf_get_serial_range(journal * jh,u32 * serial_start,u32 * serial_end)1900 journal_cjf_get_serial_range(journal *jh, u32 *serial_start, u32 *serial_end)
1901 {
1902     journal_cjf *jnl = (journal_cjf*)jh;
1903 
1904     journal_cjf_readlock(jnl);
1905 
1906     if(serial_start != NULL)
1907     {
1908         *serial_start = jnl->serial_begin;
1909     }
1910     if(serial_end != NULL)
1911     {
1912         *serial_end = jnl->serial_end;
1913     }
1914 
1915     journal_cjf_readunlock(jnl);
1916 
1917     return SUCCESS;
1918 }
1919 
1920 static ya_result
journal_cjf_truncate_to_size(journal * jh,u32 size_)1921 journal_cjf_truncate_to_size(journal *jh, u32 size_)
1922 {
1923     journal_cjf *jnl = (journal_cjf*)jh;
1924 
1925     if(size_ == 0)
1926     {
1927         journal_cjf_writelock(jnl);
1928 
1929         log_debug("cjf: %s,%p: truncate to size 0", jnl->journal_file_name, jnl->file);
1930 
1931         if(jnl->file == NULL)
1932         {
1933             journal_cjf_page_cache_close(jnl->file);
1934             file_pool_close(jnl->file);
1935             jnl->file = NULL;
1936         }
1937         file_pool_unlink_from_pool_and_filename(journal_file_pool, jnl->journal_file_name);
1938 
1939         jnl->idxt.dirty = FALSE;
1940         journal_cjf_idxt_destroy(jnl);
1941 
1942         jnl->file_maximum_size = MAX_U32;
1943         if(jnl->zone != NULL)
1944         {
1945             jnl->file_maximum_size = jnl->zone->wire_size >> 1;
1946             zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
1947         }
1948 
1949         jnl->file = NULL;
1950 
1951         jnl->last_page.file_offset = CJF_HEADER_SIZE;
1952         jnl->last_page.count = 0;
1953         jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
1954         jnl->last_page.serial_start = 0;
1955         jnl->last_page.serial_end = 0;
1956         jnl->last_page.records_limit = CJF_HEADER_SIZE + CJF_SECTION_INDEX_SIZE;
1957         jnl->last_page.file_offset_limit = jnl->file_maximum_size;
1958 
1959         jnl->serial_begin = 0;
1960         jnl->serial_end = 0;
1961         jnl->first_page_offset = CJF_HEADER_SIZE;
1962         jnl->page_table_file_offset = 0;
1963         jnl->last_soa_offset = 0;
1964         //jnl->file_maximum_size = MAX_U32;
1965 
1966         //jnl->mtx.owner = LOCK_NONE;
1967         //jnl->mtx.count = 0;
1968 
1969         jnl->flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN;
1970 
1971         jnl->last_page.records_limit = jnl->last_page.file_offset + CJF_SECTION_INDEX_SIZE;
1972         jnl->last_page.file_offset_limit = jnl->file_maximum_size;
1973 
1974         //jnl->journal_file_name = strdup(filename);
1975 
1976         journal_cjf_writeunlock(jnl);
1977 
1978         return SUCCESS;
1979     }
1980     else
1981     {
1982         log_err("cjf: %s,%p: truncate to size != 0 not implemented", jnl->journal_file_name, jnl->file);
1983 
1984         return ZDB_JOURNAL_FEATURE_NOT_SUPPORTED;
1985     }
1986 }
1987 
1988 static ya_result
journal_cjf_truncate_to_serial(journal * jh,u32 serial_)1989 journal_cjf_truncate_to_serial(journal *jh, u32 serial_)
1990 {
1991     journal_cjf *jnl = (journal_cjf*)jh;
1992     (void)serial_;
1993     journal_cjf_readlock(jnl);
1994     log_err("cjf: %s,%p: truncate to serial not implemented", jnl->journal_file_name, jnl->file);
1995     journal_cjf_readunlock(jnl);
1996 
1997     return ZDB_JOURNAL_FEATURE_NOT_SUPPORTED;
1998 }
1999 
2000 /**
2001  *
2002  * @param jnl
2003  * @return
2004  */
2005 
2006 static ya_result
journal_cjf_reopen(journal * jh)2007 journal_cjf_reopen(journal *jh)
2008 {
2009 #if 0 /* fix */
2010 #else
2011     return SUCCESS;
2012 #endif
2013 }
2014 
2015 static void
journal_cjf_flush(journal * jh)2016 journal_cjf_flush(journal *jh)
2017 {
2018     journal_cjf *jnl = (journal_cjf*)jh;
2019 
2020     log_debug("cjf: %s,%p: flush", jnl->journal_file_name, jnl->file);
2021 
2022     journal_cjf_writelock(jnl);
2023 
2024 #if ZDB_ZONE_HAS_JNL_REFERENCE
2025     zdb_zone *zone;
2026     if((zone = (zdb_zone*)jnl->zone) != NULL)
2027     {
2028         yassert(zone->journal == jh);
2029         zone->journal = NULL;
2030     }
2031 #endif
2032 
2033     log_debug3("cjf: %s,%p: flushing to file", jnl->journal_file_name, jnl->file);
2034 
2035     log_debug3("cjf: %s,%p: flushing to file: flushing PAGE cache", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2036     journal_cjf_page_cache_flush(jnl->file);
2037     log_debug3("cjf: %s,%p: flushing to file: flushing IDXT", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2038     journal_cjf_idxt_flush(jnl);
2039     log_debug3("cjf: %s,%p: flushing to file: flushing header", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2040     journal_cjf_header_flush(jnl);
2041 
2042     journal_cjf_writeunlock(jnl);
2043 }
2044 
2045 static ya_result
journal_cjf_close(journal * jh)2046 journal_cjf_close(journal *jh)
2047 {
2048     journal_cjf *jnl = (journal_cjf*)jh;
2049 
2050     log_debug("cjf: %s,%p: close", jnl->journal_file_name, jnl->file);
2051 
2052     journal_cjf_writelock(jnl);
2053 
2054 #if ZDB_ZONE_HAS_JNL_REFERENCE
2055     zdb_zone *zone;
2056     if((zone = (zdb_zone*)jnl->zone) != NULL)
2057     {
2058         yassert(zone->journal == jh);
2059         zone->journal = NULL;
2060     }
2061 #endif
2062 
2063     log_debug3("cjf: %s,%p: closing file", jnl->journal_file_name, jnl->file);
2064 
2065     if(jnl->file != NULL)
2066     {
2067         log_debug3("cjf: %s,%p: closing file: closing PAGE cache", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2068         journal_cjf_page_cache_close(jnl->file);
2069         log_debug3("cjf: %s,%p: closing file: flushing IDXT", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2070         journal_cjf_idxt_flush(jnl);
2071         log_debug3("cjf: %s,%p: closing file: flushing header", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2072         journal_cjf_header_flush(jnl);
2073         log_debug3("cjf: %s,%p: closing file: closing file", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2074 
2075         journal_cjf_idxt_destroy(jnl);
2076 
2077         if(jnl->zone != NULL)
2078         {
2079             log_info("zone: %{dnsname}: closing journal file '%s'", jnl->origin, jnl->journal_file_name);
2080         }
2081         else
2082         {
2083             log_info("zone: <notset>: closing journal file '%s'", jnl->journal_file_name);
2084         }
2085 
2086         file_pool_close(jnl->file);
2087         jnl->file = NULL;
2088     }
2089 
2090     journal_cjf_writeunlock(jnl);
2091 
2092     return SUCCESS;
2093 }
2094 
2095 static void
journal_cjf_log_dump(journal * jh)2096 journal_cjf_log_dump(journal *jh)
2097 {
2098     journal_cjf *jnl = (journal_cjf*)jh;
2099     journal_cjf_readlock(jnl);
2100     log_debug("cjf: %s,%p: [%u; %u] '%s' (%i) lck=%i rc=%i", jnl->journal_file_name, jnl->file, jnl->serial_begin, jnl->serial_end, jnl->journal_file_name, jnl->file, jnl->mtx.owner, jnl->mtx.count);
2101     journal_cjf_readunlock(jnl);
2102 }
2103 
2104 static ya_result
journal_cjf_get_domain(journal * jh,u8 * out_domain)2105 journal_cjf_get_domain(journal *jh, u8 *out_domain)
2106 {
2107     journal_cjf *jnl = (journal_cjf*)jh;
2108 
2109     // don't: journal_cjf_readlock(jnl); as the field is constant until the destruction of the journal
2110 
2111     dnsname_copy(out_domain, jnl->origin);
2112     return SUCCESS;
2113 }
2114 
2115 /**
2116  * Links a zdb_zone and a journal
2117  *
2118  * @param jh
2119  * @param zone
2120  */
2121 
2122 static void
journal_cjf_link_zone(journal * jh,zdb_zone * zone)2123 journal_cjf_link_zone(journal *jh, zdb_zone *zone)
2124 {
2125     journal_cjf *jnl = (journal_cjf*)jh;
2126 
2127     journal_cjf_writelock(jnl);
2128 
2129     if(jnl->zone != zone)
2130     {
2131         jnl->file_maximum_size = MAX_U32;
2132 
2133 #if !ZDB_ZONE_HAS_JNL_REFERENCE
2134         if(jnl->zone != NULL)
2135         {
2136             log_debug("cjf: %s,%p: unlinking zone %{dnsname},%p", jnl->journal_file_name, jnl->file, jnl->zone->origin, jnl->zone);
2137 
2138             zdb_zone_release((zdb_zone*)jnl->zone); //jnl->zone = NULL;
2139         }
2140 
2141         if(zone != NULL)
2142         {
2143             zdb_zone_acquire(zone);
2144 
2145             log_debug("cjf: %s,%p: linking to zone %{dnsname},%p", jnl->journal_file_name, jnl->file, zone->origin, zone);
2146 
2147             jnl->file_maximum_size = zone->wire_size >> 1;
2148         }
2149 #endif
2150         jnl->zone = zone;
2151 
2152         zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
2153 
2154         jnl->last_page.file_offset_limit = jnl->file_maximum_size;
2155     }
2156 
2157     journal_cjf_writeunlock(jnl);
2158 }
2159 
2160 static void
journal_cjf_destroy(journal * jh)2161 journal_cjf_destroy(journal *jh)
2162 {
2163     journal_cjf *jnl = (journal_cjf*)jh;
2164 
2165     yassert(jnl->rc == 0);
2166 
2167     log_debug("cjf: %s,%p: destroy", jnl->journal_file_name, jnl->file);
2168 
2169     journal_cjf_link_zone(jh, NULL);
2170 
2171     shared_group_mutex_destroy(&jnl->mtx);
2172     free(jnl->origin);
2173     free(jnl->journal_file_name);
2174 
2175 #if DEBUG
2176     memset(jnl, 0xfe, sizeof(journal_cjf));
2177     jnl->mru = FALSE;
2178 #endif
2179 
2180     ZFREE_OBJECT(jnl);
2181 }
2182 
2183 static const u8 *
journal_cjf_get_domain_const(journal * jh)2184 journal_cjf_get_domain_const(journal *jh)
2185 {
2186     journal_cjf *jnl = (journal_cjf*)jh;
2187     return jnl->origin;
2188 }
2189 
2190 /*******************************************************************************
2191  *
2192  * vtbl handling functions
2193  *
2194  ******************************************************************************/
2195 
2196 struct journal_vtbl journal_cjf_vtbl =
2197 {
2198     journal_cjf_get_format_name,
2199     journal_cjf_get_format_version,
2200     journal_cjf_reopen,
2201     journal_cjf_flush,
2202     journal_cjf_close,
2203     journal_cjf_append_ixfr_stream,
2204     journal_cjf_get_ixfr_stream_at_serial,
2205     journal_cjf_get_first_serial,
2206     journal_cjf_get_last_serial,
2207     journal_cjf_get_serial_range,
2208     journal_cjf_truncate_to_size,
2209     journal_cjf_truncate_to_serial,
2210     journal_cjf_log_dump,
2211     journal_cjf_get_domain,
2212     journal_cjf_destroy,
2213     journal_cjf_link_zone,
2214     journal_cjf_get_domain_const,
2215     JOURNAL_CLASS_NAME
2216 };
2217 
2218 ya_result
journal_cjf_load_index_table(journal * jh)2219 journal_cjf_load_index_table(journal *jh)
2220 {
2221     journal_cjf *jnl = (journal_cjf*)jh;
2222 
2223     // check if the index table (referencing all indexes)
2224 
2225     if(jnl->page_table_file_offset == 0)
2226     {
2227         // the table does not exist or is corrupted
2228         // it has to be read again
2229         // from first_index_offset the index list has to be followed to recreate it
2230         // if such table only contains one entry, it could probably be ignored in most cases
2231     }
2232     else
2233     {
2234         // seek and load the table
2235     }
2236 
2237     return -1;
2238 }
2239 
2240 static journal_cjf*
journal_cjf_alloc_default(const u8 * origin,const char * filename)2241 journal_cjf_alloc_default(const u8 *origin, const char *filename)
2242 {
2243     journal_cjf *jnl;
2244     ZALLOC_OBJECT_OR_DIE(jnl, journal_cjf, JRNLCJF_TAG);
2245     ZEROMEMORY(jnl, sizeof(journal_cjf));
2246     jnl->vtbl = &journal_cjf_vtbl;
2247     jnl->mru_node.data = jnl;
2248     jnl->file = NULL;
2249     jnl->file_maximum_size = MAX_U32;
2250     jnl->first_page_offset = CJF_HEADER_SIZE;
2251     jnl->origin = dnsname_dup(origin);
2252     jnl->journal_file_name = strdup(filename);
2253     jnl->last_page.file_offset = CJF_HEADER_SIZE;
2254     jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
2255     jnl->last_page.records_limit = CJF_HEADER_SIZE + CJF_SECTION_INDEX_SIZE;
2256     jnl->last_page.file_offset_limit = jnl->file_maximum_size;
2257     jnl->flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN|JOURNAL_CFJ_FLAGS_UNINITIALISED;
2258     shared_group_mutex_init(&jnl->mtx, &journal_shared_mtx, "journal-cjf");
2259     return jnl;
2260 }
2261 /**
2262  * The caller guarantees not to call this on an already opened journal
2263  *
2264  * Should not be called directly (only by journal_* functions.
2265  *
2266  * Opens or create a journal handling structure.
2267  * If the journal did not exist, the structure is returned without a file opened
2268  *
2269  * @param jh
2270  * @param origin
2271  * @param workingdir
2272  * @param create
2273  *
2274  * @return
2275  */
2276 
2277 ya_result
journal_cjf_open_file(journal ** jhp,const char * filename,const u8 * origin,bool create)2278 journal_cjf_open_file(journal **jhp, const char *filename, const u8* origin, bool create)
2279 {
2280     // CFJ_PAGE_CACHE ->
2281     if(!journal_initialized)
2282     {
2283         journal_cjf_page_cache_init();
2284 
2285         shared_group_shared_mutex_init(&journal_shared_mtx);
2286 
2287         journal_file_pool = file_pool_init("journal-file-pool", 256);
2288 
2289         journal_initialized = TRUE;
2290     }
2291 
2292     journal_cjf *jnl = NULL;
2293     ya_result ret;
2294 
2295     if(file_exists(filename) || create)
2296     {
2297         // instantiate and open the journal
2298 
2299         ret = journal_cjf_init_from_file(&jnl, origin, filename, create);
2300 
2301         if(ISOK(ret))
2302         {
2303             yassert(jnl != NULL); // to help scan-build
2304 
2305             if(!((jnl->serial_begin == 0) && (jnl->serial_begin == jnl->serial_end))) // scan-build false-positive : if ISOK(ret) => jnl != NULL
2306             {
2307                 journal_cjf_load_idxt(jnl);
2308             }
2309         }
2310         else
2311         {
2312             if(create)
2313             {
2314                 log_err("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
2315             }
2316             else
2317             {
2318                 log_debug("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
2319             }
2320 
2321             if(jnl != NULL)
2322             {
2323                 journal_cjf_destroy((journal*)jnl);
2324 #if DEBUG
2325                 log_debug("cjf: %{dnsname}: journal file cannot be opened/created", origin);
2326 #endif
2327             }
2328 
2329             return ZDB_ERROR_ICMTL_NOTFOUND;
2330         }
2331 
2332 #if DEBUG
2333         log_debug("cjf: %{dnsname}: journal opened", origin);
2334 #endif
2335         *jhp = (journal*)jnl;
2336 
2337         return SUCCESS;
2338     }
2339     else
2340     {
2341 #if DEBUG
2342         log_debug("cjf: %{dnsname}: journal file not found", origin);
2343 #endif
2344         return ZDB_ERROR_ICMTL_NOTFOUND;
2345     }
2346 }
2347 
2348 /**
2349  * The caller guarantees not to call this on an already opened journal
2350  *
2351  * Should not be called directly (only by journal_* functions.
2352  *
2353  * Opens or create a journal handling structure.
2354  * If the journal did not exist, the structure is returned without a file opened
2355  *
2356  * @param jh
2357  * @param origin
2358  * @param workingdir
2359  * @param create
2360  *
2361  * @return
2362  */
2363 
2364 ya_result
journal_cjf_open(journal ** jhp,const u8 * origin,const char * workingdir,bool create)2365 journal_cjf_open(journal **jhp, const u8* origin, const char *workingdir, bool create)
2366 {
2367     // CFJ_PAGE_CACHE <-
2368 
2369     ya_result ret;
2370 
2371     *jhp = NULL;
2372 
2373     // generate the file name
2374 
2375     char filename[PATH_MAX];
2376 
2377     if((jhp == NULL) || (origin == NULL) || (workingdir == NULL))
2378     {
2379         return ZDB_JOURNAL_WRONG_PARAMETERS;
2380     }
2381 
2382 #if DEBUG
2383     log_debug("cjf: trying to open journal for %{dnsname} in '%s'", origin, workingdir);
2384 #endif
2385 
2386     /* get the soa of the loaded zone */
2387 
2388     if(FAIL(ret = snformat(filename, sizeof(filename), CJF_WIRE_FILE_FORMAT, workingdir, origin)))
2389     {
2390 #if DEBUG
2391         log_debug("cjf: %{dnsname}: journal file name is too long", origin);
2392 #endif
2393         return ret;
2394     }
2395 
2396     ret = journal_cjf_open_file(jhp, filename, origin, create);
2397 
2398     return ret;
2399 }
2400 
2401 void
journal_cjf_finalize()2402 journal_cjf_finalize()
2403 {
2404     journal_cjf_page_cache_finalize();
2405     file_pool_finalize(journal_file_pool);
2406 }
2407 
2408 #endif
2409 
2410 /** @} */
2411