1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup
36 * @ingroup
37 * @brief
38 *
39 *
40 *
41 * @{
42 *
43 *----------------------------------------------------------------------------*/
44
45 /*------------------------------------------------------------------------------
46 * GLOBAL VARIABLES */
47
48 /*------------------------------------------------------------------------------
49 * STATIC PROTOTYPES */
50
51 /*------------------------------------------------------------------------------
52 * FUNCTIONS */
53
54 /** @brief Function ...
55 *
56 * ...
57 *
58 * @param ...
59 *
60 * @retval OK
61 * @retval NOK
62 */
63
64
65 #include "dnsdb/dnsdb-config.h"
66
67 #define ZDB_JOURNAL_CODE 1
68
69 #include "dnsdb/journal.h"
70
71 #if JOURNAL_CJF_ENABLED
72
73 #define JOURNAL_CJF_BASE 1
74
75 #include "dnsdb/journal-cjf-page.h"
76 #include "dnsdb/journal-cjf-page-cache.h"
77 #include "dnsdb/journal-cjf-page-output-stream.h"
78 #include "dnsdb/journal-cjf-idxt.h"
79 #include "dnsdb/journal-cjf-common.h"
80 #include "dnsdb/journal-cjf.h"
81
82 #include <sys/types.h>
83 #include <sys/stat.h>
84 #include <unistd.h>
85 #include <dirent.h>
86 #include <fcntl.h>
87
88 #include <dnscore/file_input_stream.h>
89 #include <dnscore/empty-input-stream.h>
90 #include <dnscore/mutex.h>
91 #include <dnscore/serial.h>
92 #include <dnscore/dns_resource_record.h>
93 #include <dnscore/format.h>
94
95 #include <dnscore/ptr_set.h>
96 #include <dnscore/fdtools.h>
97
98 #include <dnscore/u32_set.h>
99 #include <dnscore/list-dl.h>
100
101 #include <dnscore/ctrl-rfc.h>
102
103 #include <dnscore/bytearray_output_stream.h>
104 #include <dnscore/bytearray_input_stream.h>
105 #include <dnscore/zalloc.h>
106
107 #include "dnsdb/zdb_error.h"
108 #include "dnsdb/zdb_utils.h"
109 #include "dnsdb/journal.h"
110 #include "dnsdb/zdb_types.h"
111 #include "dnsdb/xfr_copy.h"
112 #include "dnsdb/zdb-zone-path-provider.h"
113 #include "dnsdb/zdb_zone.h"
114
115 extern logger_handle* g_database_logger;
116 #define MODULE_MSG_HANDLE g_database_logger
117
118 #define DEBUG_JOURNAL 1
119 #if !DEBUG
120 #undef DEBUG_JOURNAL
121 #define DEBUG_JOURNAL 0
122 #endif
123
124 #define JOURNAL_FORMAT_NAME "cyclic"
125 #define VERSION_HI 0
126 #define VERSION_LO 1
127 #define JOURNAL_CLASS_NAME "journal_cjf"
128
129 #define LOCK_NONE 0
130 #define LOCK_READ 1
131 #define LOCK_WRITE 2
132
133 #define CJF_EXT "cjf"
134 #define CJF_EXT_STRLEN 3
135
136 #define SOA_RDATA_SIZE_MAX 532
137
138 #define DO_SYNC 1
139
140 #define JRNLCJF_TAG 0x58494c4e524a
141
142 /**
143 * Two steps means that the journal is written in two passes.
144 * Pass 1 gathers a full page from input and validates it.
145 * Pass 2 stores it to the journal file.
146 */
147
148 #define CJF_USE_TWO_STEPS 1
149
150 /*
151 * Contains the journal (almost: not the matching start and end SOA)
152 */
153
154 #define CJF_WIRE_FILE_FORMAT "%s/%{dnsname}." CJF_EXT
155 #define FIRST_FROM_END (CJF_EXT_STRLEN + (1 + 8 + 1 + 8))
156 #define LAST_FROM_END (CJF_EXT_STRLEN + (1 + 8))
157
158 /*******************************************************************************
159 *
160 * JNL (HEADER) ---> IDXT
161 * | |
162 * +------+------+---+
163 * | | |
164 * v v v
165 * PAGE -> PAGE -> PAGE
166 * | | |
167 * v v v
168 * IXFRs IXFRs IXFRs
169 *
170 ******************************************************************************/
171
172 /*
173 * MAGIC 'JNL' + Version 0
174 * Serial begin
175 * Serial end
176 * Begin Index Offset
177 * Table Index Offset
178 */
179
180
181 /**
182 * There is a need of lockable 4K pages in an MRU that points back to their user
183 * That's where the PAGE will be stored
184 * I'm not sure of what the ratio between allowed FDs and allowed PAGE pages should be.
185 */
186
187 static shared_group_shared_mutex_t journal_shared_mtx;
188 static bool journal_initialized = FALSE;
189
190 static file_pool_t journal_file_pool = 0;
191
192 void
log_debug_jnl(journal_cjf * jnl,const char * text)193 log_debug_jnl(journal_cjf *jnl, const char *text)
194 {
195 log_debug4("cjf: %s,%p: %s: header SN=[%08x; %08x] F=%08x L=%08x dirty=%i empty=%i",
196 jnl->journal_file_name, jnl->file, text,
197 jnl->serial_begin, jnl->serial_end,
198 jnl->first_page_offset, jnl->page_table_file_offset,
199 journal_cjf_is_dirty(jnl),
200 journal_cjf_isempty(jnl));
201
202 s16 n = jnl->idxt.count;
203
204 if(jnl->last_page.count == 0)
205 {
206 n--;
207 }
208
209 log_debug4("cjf: %s,%p: %s: idxt %3hi/%3hi [%3hi] dirty=%i marked=%i",
210 jnl->journal_file_name, jnl->file, text,
211 jnl->idxt.count, jnl->idxt.size, jnl->idxt.first, (jnl->idxt.dirty)?1:0, (jnl->idxt.marked)?1:0);
212
213 log_debug4("cjf: %s,%p: %s: page: SN=[%08x; %08x] count=%3u size=%3u at=%08x next=%08x ... limit=%08x",
214 jnl->journal_file_name, jnl->file, text,
215 jnl->last_page.serial_start, jnl->last_page.serial_end,
216 jnl->last_page.count,jnl->last_page.size,
217 jnl->last_page.file_offset, jnl->last_page.records_limit,
218 jnl->last_page.file_offset_limit);
219
220 for(s16 idx = 0; idx < n; idx++)
221 {
222 journal_cjf_idxt_tbl_item *item = &jnl->idxt.entries[(jnl->idxt.first + idx) % jnl->idxt.size];
223
224 log_debug4("cjf: %s,%p: %s: idxt[%3i] = %08x %08x", jnl->journal_file_name, jnl->file, text, idx, item->last_serial, item->file_offset);
225 }
226
227 if(jnl->last_page.count == 0)
228 {
229 journal_cjf_idxt_tbl_item *item = &jnl->idxt.entries[(jnl->idxt.first + n) % jnl->idxt.size];
230
231 log_debug4("cjf: %s,%p: %s: idxt[%3i] = [empty] %08x", jnl->journal_file_name, jnl->file, text, n, item->file_offset);
232 }
233 }
234
235 static void
journal_cjf_writelock(journal_cjf * jnl)236 journal_cjf_writelock(journal_cjf *jnl)
237 {
238 #if DEBUG
239 log_debug4("cjf: %s,%p: write lock", jnl->journal_file_name, jnl->file);
240 #endif
241 shared_group_mutex_lock(&jnl->mtx, GROUP_MUTEX_WRITE);
242 }
243
244 static void
journal_cjf_writeunlock(journal_cjf * jnl)245 journal_cjf_writeunlock(journal_cjf *jnl)
246 {
247 #if DEBUG
248 log_debug4("cjf: %s,%p: write unlock", jnl->journal_file_name, jnl->file);
249 #endif
250 shared_group_mutex_unlock(&jnl->mtx, GROUP_MUTEX_WRITE);
251 }
252
253 static void
journal_cjf_readlock(journal_cjf * jnl)254 journal_cjf_readlock(journal_cjf *jnl)
255 {
256 #if DEBUG
257 log_debug4("cjf: %s,%p: read lock", jnl->journal_file_name, jnl->file);
258 #endif
259 shared_group_mutex_lock(&jnl->mtx, GROUP_MUTEX_READ);
260 }
261
262 static void
journal_cjf_readunlock(journal_cjf * jnl)263 journal_cjf_readunlock(journal_cjf *jnl)
264 {
265 #if DEBUG
266 log_debug4("cjf: %s,%p: read unlock", jnl->journal_file_name, jnl->file);
267 #endif
268 shared_group_mutex_unlock(&jnl->mtx, GROUP_MUTEX_READ);
269 }
270
271 bool
journal_cjf_isreadlocked(journal_cjf * jnl)272 journal_cjf_isreadlocked(journal_cjf *jnl)
273 {
274 bool ret = shared_group_mutex_islocked_by(&jnl->mtx, GROUP_MUTEX_READ);
275 return ret;
276 }
277
278 bool
journal_cjf_iswritelocked(journal_cjf * jnl)279 journal_cjf_iswritelocked(journal_cjf *jnl)
280 {
281 bool ret = shared_group_mutex_islocked_by(&jnl->mtx, GROUP_MUTEX_WRITE);
282 return ret;
283 }
284
285 void
journal_cjf_release(journal_cjf * jnl)286 journal_cjf_release(journal_cjf *jnl)
287 {
288 journal_release((journal*)jnl);
289 }
290
291 static journal_cjf* journal_cjf_alloc_default(const u8 *origin, const char *filename);
292
293 static void
journal_cjf_load_idxt(journal_cjf * jnl)294 journal_cjf_load_idxt(journal_cjf *jnl)
295 {
296 if(jnl->idxt.entries != NULL)
297 {
298 return;
299 }
300
301 journal_cjf_idxt_load(jnl);
302
303 if(jnl->idxt.count > 0)
304 {
305 jnl->last_page.file_offset = journal_cjf_idxt_get_last_file_offset(jnl);
306 journal_cjf_page_tbl_header current_page_header;
307 journal_cjf_page_cache_read_header(jnl->file, jnl->last_page.file_offset, ¤t_page_header);
308 jnl->last_page.count = current_page_header.count;
309 jnl->last_page.size = current_page_header.size;
310
311 if(jnl->last_page.file_offset < jnl->first_page_offset)
312 {
313 jnl->last_page.file_offset_limit = jnl->first_page_offset;
314 }
315 else
316 {
317 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
318 }
319
320 if(jnl->idxt.count > 1)
321 {
322 jnl->last_page.serial_start = journal_cjf_idxt_get_last_serial(jnl, jnl->idxt.count - 2);
323 }
324 else
325 {
326 jnl->last_page.serial_start = jnl->serial_begin;
327 }
328 }
329 else
330 {
331 jnl->idxt.dirty = FALSE;
332 jnl->flags |= JOURNAL_CFJ_FLAGS_DIRTY;
333
334 journal_cjf_page_cache_flush(jnl->file);
335
336 journal_cjf_idxt_destroy(jnl);
337
338 jnl->serial_begin = 0;
339 jnl->serial_end = 0;
340
341 jnl->mtx.owner = LOCK_NONE;
342 jnl->mtx.count = 0;
343 jnl->first_page_offset = CJF_HEADER_SIZE;
344 jnl->page_table_file_offset = 0;
345 jnl->last_soa_offset = 0;
346 jnl->file_maximum_size = MAX_U32;
347
348 if(jnl->zone != NULL)
349 {
350 jnl->file_maximum_size = jnl->zone->wire_size >> 1;
351 zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
352 }
353
354 jnl->last_page.file_offset = CJF_HEADER_SIZE;
355 jnl->last_page.count = 0;
356 jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
357 jnl->last_page.serial_start = 0;
358 jnl->last_page.serial_end = 0;
359 jnl->last_page.records_limit = jnl->last_page.file_offset + CJF_SECTION_INDEX_SIZE;
360 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
361
362 #if _BSD_SOURCE || _XOPEN_SOURCE >= 500 || _XOPEN_SOURCE && _XOPEN_SOURCE_EXTENDED || /* Since glibc 2.3.5: */ _POSIX_C_SOURCE >= 200112L
363 file_pool_resize(jnl->file, CJF_HEADER_SIZE);
364 #endif
365 }
366 }
367
368 static int
journal_cjf_create_file(journal_cjf ** jnlp,const u8 * origin,const char * filename)369 journal_cjf_create_file(journal_cjf **jnlp, const u8 *origin, const char *filename)
370 {
371 log_debug3("cjf: %{dnsname}: creating %s", origin, filename);
372
373 int flags = O_RDWR|O_CREAT|O_EXCL|O_CLOEXEC;
374 #ifdef O_NOATIME
375 flags |= O_NOATIME;
376 #endif
377 file_pool_file_t file;
378 ya_result ret;
379 cjf_header hdr;
380
381 file = file_pool_open_ex(journal_file_pool, filename, flags, 0644);
382
383 if(file != NULL)
384 {
385 journal_cjf *jnl = journal_cjf_alloc_default(origin, filename);
386
387 hdr.magic_plus_version = CJF_CJF0_MAGIC;
388 hdr.serial_begin = 0;
389 hdr.serial_end = 0;
390 hdr.first_index_offset = 0;
391 hdr.table_index_offset = 0;
392 hdr.last_soa_offset = 0,
393 hdr.last_page_offset_next = 0;
394 //hdr.last_page_item_count = 0;
395 hdr.flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN; // not dirty
396
397 ssize_t n = file_pool_writefully(file, &hdr, CJF_HEADER_SIZE);
398 if(n < 0)
399 {
400 ret = ERRNO_ERROR;
401 return ret;
402 }
403
404 jnl->file = file;
405
406 *jnlp = jnl;
407
408 return SUCCESS;
409 }
410 else
411 {
412 ret = ERRNO_ERROR;
413 log_err("cjf: %s: failed to create %s: %r", origin, filename, ret);
414
415 *jnlp = NULL;
416
417 return ret;
418 }
419 }
420
421 /**
422 *
423 * Does NOT set the fd field in jnl
424 * MUST return -1 in case of error
425 *
426 * @param jnl
427 * @param create
428 * @return the file descriptor or an error code
429 */
430
431 static int
journal_cjf_init_from_file(journal_cjf ** jnlp,const u8 * origin,const char * filename,bool create)432 journal_cjf_init_from_file(journal_cjf **jnlp, const u8 *origin, const char *filename, bool create)
433 {
434 log_debug3("cjf: %{dnsname}: opening%s %s", origin, (create)?"/creating":"", filename);
435
436 int flags = O_RDWR|O_CLOEXEC;
437 #ifdef O_NOATIME
438 flags |= O_NOATIME;
439 #endif
440 file_pool_file_t file;
441 ya_result ret;
442 bool bad_journal = FALSE;
443 cjf_header hdr;
444
445 file = file_pool_open_ex(journal_file_pool, filename, flags, 0660);
446
447 if(file == NULL)
448 {
449 ret = ERRNO_ERROR;
450 log_debug3("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
451
452 if(create)
453 {
454 ret = journal_cjf_create_file(jnlp, origin, filename);
455 }
456
457 return ret;
458 }
459
460 s64 size = filesize(filename);
461 if(size < CJF_HEADER_SIZE)
462 {
463 bad_journal = TRUE;
464 }
465
466 // look if the journal makes sense
467
468 if(FAIL(ret = file_pool_readfully(file, &hdr, sizeof(hdr))))
469 {
470 ret = ERRNO_ERROR;
471 log_err("cjf: %{dnsname}: could not read header on %s: %r", origin, filename, ret);
472 bad_journal = TRUE;
473 }
474 else if((hdr.magic_plus_version != CJF_CJF0_MAGIC) || ((hdr.flags & JOURNAL_CFJ_FLAGS_MY_ENDIAN) == 0) )
475 {
476 if(hdr.magic_plus_version != CJF_CJF0_MAGIC)
477 {
478 log_err("cjf: %{dnsname}: wrong magic on %s", origin, filename);
479 }
480 else
481 {
482 log_err("cjf: %{dnsname}: wrong endian on %s", origin, filename);
483 }
484
485 bad_journal = TRUE;
486 }
487 else if(hdr.first_index_offset == 0)
488 {
489 bad_journal = TRUE;
490 }
491
492 if(!bad_journal)
493 {
494 // it does makes sense
495
496 // note: DO NOT jnl->file = fd;
497
498 journal_cjf *jnl = journal_cjf_alloc_default(origin, filename);
499
500 jnl->flags = hdr.flags;
501
502 jnl->serial_begin = hdr.serial_begin;
503 jnl->serial_end = hdr.serial_end;
504 jnl->first_page_offset = hdr.first_index_offset;
505 jnl->page_table_file_offset = hdr.table_index_offset;
506 jnl->last_soa_offset = hdr.last_soa_offset;
507
508 jnl->last_page.serial_end = jnl->serial_end;
509 jnl->last_page.records_limit = hdr.last_page_offset_next;
510
511 jnl->file = file;
512
513 log_debug("cjf: %{dnsname}: journal expected to cover serials from %i to %i", jnl->origin, hdr.serial_begin, hdr.serial_end);
514 log_debug("cjf: %{dnsname}: journal table index located at %x%s", jnl->origin, hdr.table_index_offset,
515 (hdr.table_index_offset!=0)?"":", which means it has not been closed properly");
516
517 *jnlp = jnl;
518
519 return SUCCESS;
520 }
521 else
522 {
523 // the journal content is unexpected
524
525 file_pool_close(file);
526 file = NULL;
527
528 char broken_file_path[PATH_MAX];
529
530 if(ISOK(ret = snformat(broken_file_path, sizeof(broken_file_path),"%s.bad-journal", filename)))
531 {
532 bool try_again = create;
533
534 // remove previous bad-journal if any
535 if(unlink(broken_file_path) < 0)
536 {
537 ret = ERRNO_ERROR;
538 if(ret == MAKE_ERRNO_ERROR(ENOENT))
539 {
540 ret = SUCCESS;
541 }
542 else
543 {
544 log_err("cjf: %{dnsname}: unable to delete previous bad journal %s: %r", origin, broken_file_path, ret);
545 try_again = FALSE;
546 }
547 }
548
549 // successfully handled the previous .bad-journal
550
551 if(ISOK(ret))
552 {
553 // rename the journal into bad-journal
554 if(rename(filename, broken_file_path) < 0)
555 {
556 ret = ERRNO_ERROR;
557 log_err("cjf: %{dnsname}: unable to rename %s into %s: %r", origin, filename, broken_file_path, ret);
558
559 if(unlink(filename) < 0)
560 {
561 ret = ERRNO_ERROR;
562 log_err("cjf: %{dnsname}: unable to delete %s: %r", origin, filename, ret);
563 try_again = FALSE;
564 }
565 }
566
567 ret = ZDB_ERROR_ICMTL_NOTFOUND;
568 }
569
570 if(try_again) // we are allowed to create and got no counter-indication
571 {
572 int ret = journal_cjf_create_file(jnlp, origin, filename); // we are in a branch where "create = TRUE"
573
574 return ret;
575 }
576 }
577 else
578 {
579 log_err("cjf: %{dnsname}: %s is a bad journal, please remove it.", origin, filename);
580 }
581 }
582
583 return ret;
584 }
585
586 void
journal_cjf_header_flush(journal_cjf * jnl)587 journal_cjf_header_flush(journal_cjf *jnl)
588 {
589 if(journal_cjf_is_dirty(jnl))
590 {
591 yassert(jnl->file != NULL);
592
593 log_debug("cjf: %s,%p: flushing header SN=[%08x; %08x] F=%08x T=%08x", jnl->journal_file_name, jnl->file,
594 jnl->serial_begin, jnl->serial_end, jnl->first_page_offset, jnl->page_table_file_offset);
595
596 off_t pos;
597
598 if((pos = file_pool_seek(jnl->file, 4, SEEK_SET)) != 4)
599 {
600 log_err("cjf: %s,%p: failed to set file position: %lli instead of %i (%r)", jnl->journal_file_name, jnl->file, pos, 4, ERRNO_ERROR);
601 logger_flush();
602 abort();
603 }
604
605 cjf_header hdr;
606 //hdr.magic_plus_version = 0;
607 hdr.serial_begin = jnl->serial_begin;
608 hdr.serial_end = jnl->serial_end;
609 hdr.first_index_offset = jnl->first_page_offset;
610 hdr.table_index_offset = jnl->page_table_file_offset;
611 hdr.last_soa_offset = jnl->last_soa_offset;
612 hdr.last_page_offset_next = jnl->last_page.records_limit;
613 //hdr.last_page_item_count = jnl->page.count;
614 hdr.flags = jnl->flags;
615
616 file_pool_writefully(jnl->file, &hdr.serial_begin, CJF_HEADER_SIZE - 4);
617
618 journal_cjf_clear_dirty(jnl);
619 }
620 }
621
622 /**
623 *
624 * Removes the first PAGE from the journal.
625 * Adjust the current PAGE limit;
626 *
627 * @param jnl
628 */
629
630 void
journal_cjf_remove_first_page(journal_cjf * jnl)631 journal_cjf_remove_first_page(journal_cjf *jnl)
632 {
633 log_debug_jnl(jnl, "journal_cjf_remove_first_page: BEFORE");
634
635 u32 stored_serial = jnl->serial_begin + 1; // (ensure an error would trigger a flush)
636
637 u8 zt = 0;
638 if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
639 {
640 if(zt == ZT_MASTER)
641 {
642 zdb_zone_info_get_stored_serial(jnl->origin, &stored_serial); // for master only
643
644 if(serial_le(stored_serial, jnl->serial_begin))
645 {
646 log_debug("cjf: %s,%p: journal page %u will be lost, flushing zone first", jnl->journal_file_name, jnl->file, jnl->journal_file_name, jnl->serial_begin);
647 zdb_zone_info_background_store_zone(jnl->origin);
648 }
649 }
650 }
651
652 journal_cjf_page_tbl_header first_page_hdr;
653 journal_cjf_page_cache_read_header(jnl->file, jnl->first_page_offset, &first_page_hdr);
654 if(first_page_hdr.next_page_offset < jnl->first_page_offset)
655 {
656 // this is the last page, of the file, physically
657 jnl->page_table_file_offset = jnl->last_page.records_limit;
658 jnl->idxt.dirty = TRUE;
659 }
660
661 journal_cjf_page_cache_clear(jnl->file, jnl->first_page_offset);
662
663 jnl->serial_begin = journal_cjf_idxt_get_last_serial(jnl, 0);
664 jnl->first_page_offset = journal_cjf_idxt_get_file_offset(jnl, 1);
665
666 ++jnl->idxt.first;
667 --jnl->idxt.count;
668
669 journal_cjf_set_dirty(jnl);
670
671 if(jnl->last_page.file_offset < jnl->first_page_offset)
672 {
673 jnl->last_page.file_offset_limit = jnl->first_page_offset;
674 }
675 else // at or after
676 {
677 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
678 }
679
680 log_debug_jnl(jnl, "journal_cjf_remove_first_page: AFTER");
681
682 log_debug("cjf: %s,%p: first PAGE now at %u (%08x), journal starts with serial %u (%08x", jnl->journal_file_name, jnl->file,
683 jnl->first_page_offset, jnl->first_page_offset, jnl->serial_begin, jnl->serial_begin);
684 }
685
686 /*******************************************************************************
687 *
688 * Index table handling functions
689 *
690 ******************************************************************************/
691
692 /*****************************************************************************/
693
694 static void journal_cjf_writelock(journal_cjf *jnl);
695 static void journal_cjf_writeunlock(journal_cjf *jnl);
696
697 static void journal_cjf_readlock(journal_cjf *jnl);
698 static void journal_cjf_readunlock(journal_cjf *jnl);
699
700 static const char *
journal_cjf_get_format_name()701 journal_cjf_get_format_name()
702 {
703 return JOURNAL_FORMAT_NAME;
704 }
705
706 static u32
journal_cjf_get_format_version()707 journal_cjf_get_format_version()
708 {
709 return VERSION_U32(VERSION_HI,VERSION_LO);
710 }
711
712 static ya_result
journal_cjf_read_soa_record(dns_resource_record * rr,input_stream * ixfr_wire_is)713 journal_cjf_read_soa_record(dns_resource_record *rr, input_stream *ixfr_wire_is)
714 {
715 ya_result return_value;
716
717 if((return_value = dns_resource_record_read(rr, ixfr_wire_is)) <= 0)
718 {
719 /* FAIL or EOF */
720 return return_value;
721 }
722
723 #if DEBUG
724 rdata_desc rdatadesc = {rr->tctr.qtype, rr->rdata_size, rr->rdata};
725 log_debug("cjf: %{dnsname} %{typerdatadesc}", rr->name, &rdatadesc);
726 #endif
727
728 if((rr->tctr.qtype != TYPE_SOA) || (rr->rdata_size > SOA_RDATA_SIZE_MAX))
729 {
730 log_err("cjf: expected SOA record but got %{dnstype} instead", &rr->tctr.qtype);
731
732 return ZDB_JOURNAL_SOA_RECORD_EXPECTED;
733 }
734
735 return return_value;
736 }
737
738 struct journal_cjf_read_ixfr_s
739 {
740 input_stream *ixfr_wire_is;
741 output_stream baos;
742 dns_resource_record rr;
743 u32 serial_from;
744 u32 serial_to;
745 u32 size;
746 bool eof;
747 };
748
749 typedef struct journal_cjf_read_ixfr_s journal_cjf_read_ixfr_s;
750
751 ya_result
journal_cjf_read_ixfr_init(journal_cjf_read_ixfr_s * ixfrinc,input_stream * ixfr_wire_is)752 journal_cjf_read_ixfr_init(journal_cjf_read_ixfr_s *ixfrinc, input_stream *ixfr_wire_is)
753 {
754 ya_result ret;
755 ixfrinc->ixfr_wire_is = ixfr_wire_is;
756 bytearray_output_stream_init_ex(&ixfrinc->baos, NULL, 65536, BYTEARRAY_DYNAMIC);
757 dns_resource_record_init(&ixfrinc->rr);
758 ixfrinc->serial_from = 0;
759 ixfrinc->serial_to = 0;
760 ixfrinc->size = 0;
761 ixfrinc->eof = FALSE;
762
763 ret = journal_cjf_read_soa_record(&ixfrinc->rr, ixfr_wire_is);
764
765 #if DEBUG
766 if(ISOK(ret))
767 {
768 log_debug2("cjf: ---: started with %{dnsrr}", &ixfrinc->rr);
769 }
770 #endif
771
772 return ret;
773 }
774
775 void
journal_cjf_read_ixfr_finalize(journal_cjf_read_ixfr_s * ixfrinc)776 journal_cjf_read_ixfr_finalize(journal_cjf_read_ixfr_s *ixfrinc)
777 {
778 ixfrinc->ixfr_wire_is = NULL;
779
780 dns_resource_record_clear(&ixfrinc->rr);
781 output_stream_close(&ixfrinc->baos);
782
783 ixfrinc->serial_from = 0;
784 ixfrinc->serial_to = 0;
785 ixfrinc->size = 0;
786 }
787
788 /**
789 *
790 * Reads a single page of incremental changes (-SOA ... +SOA ...)
791 *
792 * @param ixfrinc
793 * @return the size of the page (0 if there is nothing to be read), or an error code
794 */
795
796 static ya_result
journal_cjf_read_ixfr_read(journal_cjf_read_ixfr_s * ixfrinc)797 journal_cjf_read_ixfr_read(journal_cjf_read_ixfr_s *ixfrinc)
798 {
799 if(ixfrinc->eof)
800 {
801 ixfrinc->size = 0;
802 return 0;
803 }
804
805 input_stream *ixfr_wire_is = ixfrinc->ixfr_wire_is;
806 output_stream *baos = &ixfrinc->baos;
807 dns_resource_record *rr = &ixfrinc->rr;
808
809 ya_result ret;
810 bool need_another_soa = TRUE;
811
812 bytearray_output_stream_reset(baos);
813
814 // must start by an SOA
815
816 if(rr->tctr.qtype == TYPE_SOA)
817 {
818 ret = rr_soa_get_serial(rr->rdata, rr->rdata_size, &ixfrinc->serial_from);
819 }
820 else
821 {
822 ret = ZDB_JOURNAL_SOA_RECORD_EXPECTED;
823 }
824
825 ixfrinc->size = 0;
826
827 if(ISOK(ret))
828 {
829 for(int idx = 0;; ++idx)
830 {
831 #if DEBUG
832 log_debug2("cjf: ---: %4i: %{dnsrr}", idx, rr);
833 #endif
834 dns_resource_record_write(rr, baos);
835
836 if((ret = dns_resource_record_read(rr, ixfr_wire_is)) <= 0)
837 {
838 if(ret == 0)
839 {
840 if(!need_another_soa)
841 {
842 ixfrinc->size = bytearray_output_stream_size(baos);
843 #if DEBUG
844 log_debug2("cjf: ===: IXFR incremental change size: %i", ixfrinc->size);
845 #endif
846 ret = ixfrinc->size;
847 }
848 else
849 {
850 #if DEBUG
851 log_debug2("cjf: ===: still expected an SOA");
852 #endif
853 // SOA expected
854 ret = ZDB_JOURNAL_SOA_RECORD_EXPECTED;
855 }
856
857 ixfrinc->eof = TRUE;
858 }
859 else
860 {
861 #if DEBUG
862 log_debug2("cjf: ===: failed to read the next record: %r", ret);
863 #endif
864 }
865
866 break;
867 }
868
869 if(rr->tctr.qtype == TYPE_SOA)
870 {
871 if(need_another_soa)
872 {
873 if(FAIL(ret = rr_soa_get_serial(rr->rdata, rr->rdata_size, &ixfrinc->serial_to)))
874 {
875 #if DEBUG
876 log_debug2("cjf: ===: failed parse serial from record: %r", ret);
877 #endif
878 break;
879 }
880
881 need_another_soa = FALSE;
882 }
883 else
884 {
885 // another page starts here
886 // this record will written for the next page
887
888 ixfrinc->size = bytearray_output_stream_size(baos);
889 #if DEBUG
890 log_debug2("cjf: ===: IXFR incremental change size: %i (followed ...)", ixfrinc->size);
891 #endif
892 ret = ixfrinc->size;
893 break;
894 }
895 }
896 }
897 }
898
899 return ret;
900 }
901
902 /**
903 * The caller will take action that will end up removing the first page.
904 * Either explicitly, either overwriting it (ie: looping).
905 *
906 * This function ensures that it's OK to do so.
907 *
908 * Returns:
909 *
910 * 0 if it's OK to do so, and no actions were taken,
911 * 1 if it's OK to do so, but the zone needed to be stored
912 * or an error code.
913 *
914 * @param jnl
915 *
916 * @return the state of the operation
917 *
918 */
919
920 static ya_result
journal_cjf_append_ixfr_stream_first_page_removal(journal_cjf * jnl)921 journal_cjf_append_ixfr_stream_first_page_removal(journal_cjf *jnl)
922 {
923 // the caller will remove first page to make room, prepare for it
924
925 ya_result ret;
926 u32 zone_stored_serial;
927
928 // get the serial of the stored zone
929
930 if(FAIL(ret = zdb_zone_info_get_stored_serial(jnl->origin, &zone_stored_serial)))
931 {
932 log_warn("cjf: %{dnsname}: could not get the serial of the stored zone: %r", jnl->origin, ret);
933 return ret;
934 }
935
936 u8 zt = 0;
937 if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
938 {
939 if(zt == ZT_SLAVE)
940 {
941 u32 ts = jnl->zone->axfr_timestamp;
942 u32 sr = jnl->zone->axfr_serial;
943 if(ts > 1)
944 {
945 if(serial_gt(sr, zone_stored_serial))
946 {
947 zone_stored_serial = sr;
948 }
949 }
950 }
951 }
952
953 // get the page of the serial
954
955 if(FAIL(ret = journal_cjf_idxt_get_page_offset_from_serial(jnl, zone_stored_serial, NULL)))
956 {
957 if(serial_le(jnl->serial_end, zone_stored_serial))
958 {
959 log_debug("cjf: %{dnsname}: no need to store the zone again as it's already %i steps further", jnl->origin, zone_stored_serial - jnl->serial_end);
960 return 0;
961 }
962 else
963 {
964 log_warn("cjf: %{dnsname}: could not get page of serial %u: %r", jnl->origin, zone_stored_serial, ret);
965 return ret;
966 }
967 }
968
969 // ret is the index of the page, if it is 0 we may need to save the current zone
970
971 bool need_to_store_before_removing_first_page = (ret == 0);
972
973 log_debug("cjf: %{dnsname}: zone currently stored up to serial %i, located on page %i of the journal", jnl->origin, zone_stored_serial, ret);
974
975 if(need_to_store_before_removing_first_page)
976 {
977 // we are about to destroy the page of the currently stored serial AND
978 // there are steps remaining to be safe
979
980 log_warn("cjf: %{dnsname}: need to store the zone right now, consider increasing the journal size", jnl->origin);
981
982 zdb_zone *zone = (zdb_zone*)jnl->zone;
983
984 // the zone at this point is supposed to be locked
985 // either simply with a simple reader
986 // either doubly with something and a simple reader
987 // if simply : do nothing
988 // if doubly : with the simple reader : exchange them
989 // with anything else : it cannot ever work
990
991 u8 owner = zone->lock_owner;
992 u8 reserved_owner = zone->lock_reserved_owner;
993 if(owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
994 {
995 zdb_zone_lock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
996 ret = zdb_zone_info_store_locked_zone(jnl->origin);
997 zdb_zone_unlock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
998 }
999 else if(owner == ZDB_ZONE_MUTEX_NOBODY)
1000 {
1001 zdb_zone_lock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1002 ret = zdb_zone_info_store_locked_zone(jnl->origin);
1003 zdb_zone_unlock(zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1004 }
1005 else if(reserved_owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
1006 {
1007 zdb_zone_exchange_locks(zone, owner, ZDB_ZONE_MUTEX_SIMPLEREADER);
1008 ret = zdb_zone_info_store_locked_zone(jnl->origin);
1009 zdb_zone_exchange_locks(zone, ZDB_ZONE_MUTEX_SIMPLEREADER, owner);
1010 }
1011 else
1012 {
1013 ret = ERROR; // obsolete
1014 }
1015
1016 if(FAIL(ret))
1017 {
1018 log_warn("cjf: %{dnsname}: cannot store the zone: %r", jnl->origin, ret);
1019 return ret;
1020 }
1021
1022 return 1; // try again
1023 }
1024 else
1025 {
1026 return 0; // continue
1027 }
1028 }
1029
journal_cjf_get_space_left_until_need_storage_page(journal_cjf * jnl)1030 static s64 journal_cjf_get_space_left_until_need_storage_page(journal_cjf *jnl)
1031 {
1032 ya_result ret;
1033 u32 zone_stored_serial;
1034
1035 if(FAIL(ret = zdb_zone_info_get_stored_serial(jnl->origin, &zone_stored_serial)))
1036 {
1037 log_warn("cjf: %{dnsname}: could not get teh serial of the stored zone: %r", jnl->origin, ret);
1038
1039 // save asap
1040
1041 return 0;
1042 }
1043
1044 u8 zt = 0;
1045 if(ISOK(zdb_zone_info_get_zone_type(jnl->origin, &zt)))
1046 {
1047 if(zt == ZT_SLAVE)
1048 {
1049 u32 ts = jnl->zone->axfr_timestamp;
1050 u32 sr = jnl->zone->axfr_serial;
1051 if(ts > 1)
1052 {
1053 if(serial_gt(sr, zone_stored_serial))
1054 {
1055 zone_stored_serial = sr;
1056 }
1057 }
1058 }
1059 }
1060
1061 // get the page of the serial
1062
1063 if(FAIL(ret = journal_cjf_idxt_get_page_offset_from_serial(jnl, zone_stored_serial, NULL)))
1064 {
1065 if(serial_le(jnl->serial_end, zone_stored_serial))
1066 {
1067 log_debug("cjf: %{dnsname}: no need to store the zone again as it's already %i steps further", jnl->origin, zone_stored_serial - jnl->serial_end);
1068
1069 // the journal is only there for the slaves, it could be completely replaced
1070
1071 return jnl->file_maximum_size;
1072 }
1073 else
1074 {
1075 log_warn("cjf: %{dnsname}: could not get page of serial %u: %r", jnl->origin, zone_stored_serial, ret);
1076
1077 // save asap
1078
1079 return 0;
1080 }
1081 }
1082
1083 const journal_cjf_idxt_tbl_item* need_storage = journal_cjf_idxt_get_entry(jnl, ret);
1084
1085 if(jnl->last_page.file_offset == need_storage->file_offset)
1086 {
1087 // we are on the page : we basically have the size of our page minus the file size
1088
1089 return MAX((s32)(jnl->file_maximum_size - (jnl->last_page.records_limit - jnl->last_page.file_offset)), 0);
1090 }
1091 else if(jnl->last_page.file_offset < need_storage->file_offset)
1092 {
1093 // we have everything until that page
1094
1095 return MAX((s32)(need_storage->file_offset - jnl->last_page.records_limit), 0);
1096 }
1097 else // if(jnl->last_page.file_offset > need_storage->file_offset)
1098 {
1099 // we have the remaining space until the end of the file plus the offset of the page (minus the header)
1100
1101 return MAX((s32)(jnl->file_maximum_size - jnl->last_page.records_limit), 0) + need_storage->file_offset;
1102 }
1103 }
1104
1105 static ya_result
journal_cjf_append_ixfr_stream_per_page(journal * jh,input_stream * ixfr_wire_is,bool is_slave)1106 journal_cjf_append_ixfr_stream_per_page(journal *jh, input_stream *ixfr_wire_is, bool is_slave)
1107 {
1108 journal_cjf *jnl = (journal_cjf*)jh;
1109 ya_result ret;
1110
1111 log_debug("cjf: %s,%p: append IXFR (master)", jnl->journal_file_name, jnl->file);
1112
1113 // ensure the zone locks are usable : locked by the reader, by nobody, or the reader is a reserved owner
1114 if(jnl->zone != NULL)
1115 {
1116 zdb_zone *zone = (zdb_zone*)jnl->zone;
1117 u8 owner = zone->lock_owner;
1118 u8 reserved_owner = zone->lock_reserved_owner;
1119 if(
1120 !(
1121 (owner == ZDB_ZONE_MUTEX_SIMPLEREADER) ||
1122 (owner == ZDB_ZONE_MUTEX_NOBODY) ||
1123 (reserved_owner == ZDB_ZONE_MUTEX_SIMPLEREADER)
1124 )
1125 )
1126 {
1127 log_err("cjf: %s,%p: append IXFR (master) cannot happen because the zone locks are not set properly", jnl->journal_file_name, jnl->file);
1128 return ERROR;
1129 }
1130 }
1131
1132 int written_pages = 0;
1133
1134 dns_resource_record rr;
1135 dns_resource_record_init(&rr);
1136
1137 output_stream os;
1138 output_stream_set_void(&os); // very important
1139
1140 journal_cjf_read_ixfr_s ixfrinc;
1141 journal_cjf_read_ixfr_init(&ixfrinc, ixfr_wire_is);
1142
1143 journal_cjf_writelock(jnl);
1144
1145 for(;;)
1146 {
1147 ret = journal_cjf_read_ixfr_read(&ixfrinc);
1148
1149 if(ret <= 0)
1150 {
1151 journal_cjf_page_output_stream_cancel(&os);
1152
1153 if(ret == 0)
1154 {
1155 log_info("cjf: %{dnsname}: no incremental changes remaining", jnl->origin);
1156 }
1157 else
1158 {
1159 log_err("cjf: %{dnsname}: failed to read changes: %r", jnl->origin, ret);
1160 }
1161 break;
1162 }
1163
1164 // else records have been read
1165
1166 yassert((ixfrinc.serial_from == ixfrinc.serial_to) || (ixfrinc.size != 0));
1167
1168 log_info("cjf: %{dnsname}: incremental changes read from %u to %u (%u bytes)", jnl->origin, ixfrinc.serial_from, ixfrinc.serial_to, ixfrinc.size);
1169
1170 bool journal_is_empty = journal_cjf_isempty(jnl);
1171
1172 if(!journal_is_empty)
1173 {
1174 // if the journal is not empty, ensure the page follows the last journal page
1175
1176 if(serial_lt(ixfrinc.serial_from, jnl->serial_end))
1177 {
1178 log_info("cjf: %{dnsname}: ignoring changes before serial %u", jnl->origin, jnl->serial_end);
1179
1180 continue; // read next page
1181 }
1182 else if(serial_gt(ixfrinc.serial_from, jnl->serial_end))
1183 {
1184 log_warn("cjf: %{dnsname}: missing changes between serials %u and %u", jnl->origin, jnl->serial_end, ixfrinc.serial_from);
1185
1186 break; // full stop
1187 }
1188 }
1189 else
1190 {
1191 // create a journal with one entry
1192
1193 journal_cjf_idxt_create(jnl, 1);
1194 }
1195
1196 /***/
1197
1198 // reserve the known size of this single page with the serial range
1199 // write the page
1200
1201 /***/
1202
1203 journal_cjf_append_ixfr_stream_master_accum_tryagain:
1204
1205 journal_cjf_page_output_stream_reopen(&os, jnl);
1206
1207 // the file is cycling, so we also need to see if we are writing at
1208 // a position before the first page's and thus risking to overwrite it
1209
1210 s64 available = 0;
1211
1212 while(journal_cjf_page_current_output_stream_may_overwrite(jnl))
1213 {
1214 // if total available is smaller than half the file, the division will be >= 2
1215
1216 s64 total_available = journal_cjf_get_space_left_until_need_storage_page(jnl);
1217
1218 // should not store in background. Handle it first-hand (maybe postpone the update) ... (obsolete)
1219
1220 if( ((total_available > 0) && ((jnl->file_maximum_size / total_available) >= 2)) || (total_available == 0))
1221 {
1222 // if not writing already, then write
1223 // 0 = not saving, 1 = saving, <0 = error
1224 if(zdb_zone_info_background_store_in_progress(jnl->origin) != 1)
1225 {
1226 zdb_zone_info_background_store_zone(jnl->origin);
1227 }
1228 }
1229
1230 available = jnl->first_page_offset - jnl->last_page.records_limit;
1231
1232 if(available >= ixfrinc.size)
1233 {
1234 log_debug("cjf: %{dnsname}: %lli >= %i storage available before overwriting the first page", jnl->origin, available, ixfrinc.size);
1235 break;
1236 }
1237
1238 if(FAIL(ret = journal_cjf_append_ixfr_stream_first_page_removal(jnl)))
1239 {
1240 break;
1241 }
1242
1243 if(ret == 1)
1244 {
1245 continue;
1246 }
1247
1248 journal_cjf_remove_first_page(jnl);
1249 } // while may overwrite
1250
1251 if(FAIL(ret))
1252 {
1253 journal_cjf_page_output_stream_cancel(&os);
1254 break;
1255 }
1256
1257 // available space between the first available byte to write records and
1258 // the file size limit
1259
1260 if(!journal_cjf_page_current_output_stream_may_overwrite(jnl))
1261 {
1262 s64 total_available = journal_cjf_get_space_left_until_need_storage_page(jnl);
1263
1264 if( ((total_available > 0) && ((jnl->file_maximum_size / total_available) >= 2)) || (total_available == 0))
1265 {
1266 // if not writing already, then write
1267 // 0 = not saving, 1 = saving, <0 = error
1268 if(zdb_zone_info_background_store_in_progress(jnl->origin) != 1)
1269 {
1270 zdb_zone_info_background_store_zone(jnl->origin);
1271 }
1272 }
1273
1274 // space available until the current limit of the page (file size or first page offset)
1275 available = journal_cjf_get_last_page_available_space_left(jnl);
1276
1277 if(available >= ixfrinc.size)
1278 {
1279 log_debug("cjf: %{dnsname}: %lli >= %i storage available in this page", jnl->origin, available, ixfrinc.size);
1280 }
1281 else
1282 {
1283 // not enough room, but can we handle it ?
1284 //
1285 // if there is only one NON-EMPTY page in the journal,
1286 // cut it and create a new one
1287 // if there are at least two NON-EMPTY pages and the available space between them is big enough,
1288 // cut the last page
1289 // remove the first
1290 // else just complain about the journal size and continue
1291
1292 int page_count = journal_cjf_idxt_get_page_count(jnl);
1293 bool last_page_empty = journal_cjf_page_line_count(jnl) == 0;
1294
1295 yassert(page_count > 0);
1296
1297 if(page_count == 1)
1298 {
1299 if(!last_page_empty)
1300 {
1301 log_debug("cjf: %{dnsname}: one page: append another page", jnl->origin);
1302
1303 // cut and proceed, as we will probably want to roll on the next update
1304
1305 journal_cjf_page_output_stream_cancel(&os);
1306 journal_cjf_idxt_append_page(jnl);
1307 output_stream_close(&os);
1308 output_stream_set_void(&os); // very important
1309
1310 //journal_cjf_page_output_stream_reopen(&os, jnl);
1311
1312 // and loop
1313
1314 ret = SUCCESS;
1315
1316 goto journal_cjf_append_ixfr_stream_master_accum_tryagain;
1317 }
1318 else
1319 {
1320 log_debug("cjf: %{dnsname}: one empty page: write on it", jnl->origin);
1321 // just proceed
1322 }
1323 }
1324 else
1325 {
1326 if(!last_page_empty)
1327 {
1328 u32 available_from_beginning = jnl->last_page.file_offset - CJF_PAGE_SIZE_IN_BYTE - CJF_HEADER_SIZE;
1329
1330 if(ixfrinc.size <= available_from_beginning)
1331 {
1332 // for a slave only, ensure the journal has been read before going further.
1333
1334 if(is_slave)
1335 {
1336 // page_count > 1
1337
1338 u32 page_1_serial_from = journal_cjf_idxt_get_page_serial_from_index(jnl, 1);
1339 u32 starting_zone_serial = page_1_serial_from - 1;
1340 zdb_zone_lock((zdb_zone*)jnl->zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1341 /*ret = */zdb_zone_getserial((zdb_zone*)jnl->zone, &starting_zone_serial); // zone is locked
1342 zdb_zone_unlock((zdb_zone*)jnl->zone, ZDB_ZONE_MUTEX_SIMPLEREADER);
1343 // compare with start serial of second page
1344 if(serial_lt(starting_zone_serial, page_1_serial_from))
1345 {
1346 // cutting the first page will break continuity :stop reading IXFR now, apply the journal and try later.
1347 journal_cjf_page_output_stream_cancel(&os);
1348 goto journal_cjf_append_ixfr_stream_master_accum_exit;
1349 }
1350 }
1351
1352 log_debug("cjf: %{dnsname}: %u of %u bytes available on a loop: loop", jnl->origin, available_from_beginning, ixfrinc.size);
1353
1354 // cutting now will allow to loop
1355
1356 if(FAIL(ret = journal_cjf_append_ixfr_stream_first_page_removal(jnl)))
1357 {
1358 // could not remove first page
1359 break;
1360 }
1361
1362 journal_cjf_page_output_stream_cancel(&os);
1363 u32 tmp = jnl->file_maximum_size;
1364 jnl->file_maximum_size = 0; // force the loop (thus removing the first page)
1365 journal_cjf_idxt_append_page(jnl);
1366 jnl->file_maximum_size = tmp;
1367 output_stream_close(&os);
1368 output_stream_set_void(&os); // very important
1369 journal_cjf_page_output_stream_reopen(&os, jnl);
1370 }
1371 else
1372 {
1373 log_debug("cjf: %{dnsname}: %u/%u bytes available on a loop: continue", jnl->origin, available_from_beginning, ixfrinc.size);
1374
1375 // do a cut to allow to loop soon
1376
1377 journal_cjf_page_output_stream_cancel(&os);
1378 u32 tmp = jnl->file_maximum_size;
1379 jnl->file_maximum_size = MAX_U32;
1380 journal_cjf_idxt_append_page(jnl);
1381 jnl->file_maximum_size = tmp;
1382 output_stream_close(&os);
1383 output_stream_set_void(&os); // very important
1384 journal_cjf_page_output_stream_reopen(&os, jnl);
1385 }
1386 }
1387 else
1388 {
1389 // just proceed
1390 log_debug("cjf: %{dnsname}: last page is empty: write on it", jnl->origin);
1391 }
1392 }
1393 }
1394 }
1395
1396 // write the ixfr increment in the page
1397
1398 input_stream bais;
1399 bytearray_input_stream_init_const(&bais, bytearray_output_stream_buffer(&ixfrinc.baos), bytearray_output_stream_size(&ixfrinc.baos));
1400 for(;;)
1401 {
1402 if((ret = dns_resource_record_read(&rr, &bais)) <= 0)
1403 {
1404 if(ret != 0)
1405 {
1406 log_err("cjf: %{dnsname}: error re-reading record: %r", jnl->origin, ret);
1407 }
1408 break;
1409 }
1410
1411 #if DEBUG
1412 log_debug("cjf: %{dnsname}: writing %{dnsrr}", jnl->origin, &rr);
1413 #endif
1414
1415 if(FAIL(ret = journal_cfj_page_output_stream_write_resource_record(&os, &rr)))
1416 {
1417 log_err("cjf: %{dnsname}: could not store record: %r", jnl->origin, ret);
1418 break;
1419 }
1420 }
1421 input_stream_close(&bais);
1422
1423 if(ISOK(ret))
1424 {
1425 if(journal_is_empty)
1426 {
1427 jnl->serial_begin = ixfrinc.serial_from;
1428 journal_cjf_clear_empty(jnl);
1429 }
1430
1431 jnl->serial_end = ixfrinc.serial_to;
1432 journal_cjf_set_dirty(jnl);
1433
1434 ++written_pages;
1435 journal_cjf_page_output_stream_next(&os);
1436
1437 // if we wrote records after the expected position for the IDXT, move the IDXT position
1438
1439 if(jnl->last_page.records_limit > jnl->page_table_file_offset)
1440 {
1441 jnl->page_table_file_offset = jnl->last_page.records_limit;
1442 jnl->idxt.dirty = TRUE;
1443 journal_cjf_set_dirty(jnl);
1444 }
1445 }
1446 else
1447 {
1448 journal_cjf_page_output_stream_cancel(&os);
1449 break;
1450 }
1451 }
1452
1453 journal_cjf_append_ixfr_stream_master_accum_exit:
1454 if(os.data != NULL)
1455 {
1456 output_stream_close(&os);
1457 }
1458
1459 journal_cjf_read_ixfr_finalize(&ixfrinc);
1460 dns_resource_record_clear(&rr);
1461
1462 if(written_pages > 0)
1463 {
1464 journal_cjf_page_cache_flush(jnl->file);
1465 journal_cjf_header_flush(jnl);
1466 }
1467
1468 journal_cjf_writeunlock(jnl);
1469
1470 if(ISOK(ret))
1471 {
1472 log_info("cjf: %{dnsname}: added %i incremental changes to the journal", jnl->origin, written_pages);
1473 ret = TYPE_IXFR;
1474 }
1475 else
1476 {
1477 log_err("cjf: %{dnsname}: append IXFR (master) failed with: %r", jnl->origin, ret);
1478 }
1479
1480 return ret;
1481 }
1482
1483 static ya_result
journal_cjf_append_ixfr_stream(journal * jh,input_stream * ixfr_wire_is)1484 journal_cjf_append_ixfr_stream(journal *jh, input_stream *ixfr_wire_is)
1485 {
1486 u8 zt;
1487 journal_cjf *jnl = (journal_cjf*)jh;
1488 ya_result ret = zdb_zone_info_get_zone_type(jnl->origin, &zt);
1489 if(ISOK(ret))
1490 {
1491 switch(zt)
1492 {
1493 case ZT_MASTER:
1494 ret = journal_cjf_append_ixfr_stream_per_page(jh, ixfr_wire_is, FALSE);
1495 break;
1496 case ZT_SLAVE:
1497 ret = journal_cjf_append_ixfr_stream_per_page(jh, ixfr_wire_is, TRUE);
1498 break;
1499 default:
1500 ret = ERROR; // obsolete
1501 break;
1502 }
1503 }
1504 return ret;
1505 }
1506
1507 /******************************************************************************
1508 *
1509 * Journal Input Stream
1510 * This one returns and IXFR stream
1511 *
1512 ******************************************************************************/
1513
1514 #define JCJFISDT_TAG 0x54445349464a434a
1515
1516 struct journal_cjf_input_stream_data
1517 {
1518 journal_cjf *jnl;
1519
1520 file_pool_file_t file;
1521 u32 available;
1522
1523 u32 serial_from;
1524 u32 page_next; // DEBUG
1525
1526 u16 idxt_index;
1527 u16 idxt_size;
1528
1529 u16 todo_soa_record_size;
1530 bool first_stream;
1531
1532 u8* todo_soa_record;
1533 };
1534
1535 typedef struct journal_cjf_input_stream_data journal_cjf_input_stream_data;
1536
1537 static ya_result
journal_cjf_input_stream_read(input_stream * stream,void * buffer_,u32 len)1538 journal_cjf_input_stream_read(input_stream* stream, void *buffer_, u32 len)
1539 {
1540 journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)stream->data;
1541 u8 *buffer = (u8*)buffer_;
1542 const u8 *base = buffer;
1543 const u8 *limit = &buffer[len];
1544 intptr n;
1545 ya_result ret = 0;
1546
1547 journal_cjf *jnl = data->jnl;
1548
1549 log_debug("cjf: %s,%p: input: reading %u/%u bytes, pos is %lli", jnl->journal_file_name, jnl->file,
1550 len, data->available, file_pool_seek(data->file, 0, SEEK_CUR));
1551
1552 // while there is still room in the output buffer
1553
1554 while((n = limit - buffer) > 0)
1555 {
1556 // if there is no data ready on input, fetch some more
1557
1558 if(data->available == 0)
1559 {
1560 // get the next one
1561
1562 if(data->idxt_index == data->idxt_size)
1563 {
1564 // EOF : we were at the last index in the IDXT
1565 break;
1566 }
1567
1568 // get the offset of the current PAGE table
1569
1570 u32 page_offset = journal_cjf_idxt_get_file_offset(data->jnl, data->idxt_index);
1571 u32 stream_offset;
1572 u32 stream_limit_offset;
1573
1574 // look for the first SOA requested
1575
1576 if(!data->first_stream)
1577 {
1578 // we are already streaming, the XFR stream starts at the end of the PAGE (4096 bytes until next version of the journal)
1579
1580 stream_offset = page_offset + CJF_PAGE_SIZE_IN_BYTE;
1581 }
1582 else
1583 {
1584 // the starting stream offset is obtained through a bit more work
1585
1586 if(FAIL(ret = journal_cjf_page_get_stream_offset_from_serial(data->jnl, data->idxt_index, data->serial_from, &stream_offset)))
1587 {
1588 return ret;
1589 }
1590
1591 data->first_stream = FALSE;
1592 }
1593
1594 journal_cjf_page_tbl_header page_header;
1595 journal_cjf_page_cache_read_header(data->jnl->file, page_offset, &page_header);
1596
1597 if(page_header.count == 0)
1598 {
1599 // empty page, proably not flushed
1600 break;
1601 }
1602
1603 stream_limit_offset = page_header.stream_end_offset;
1604
1605 // we know where to start ...
1606
1607 data->idxt_index++;
1608
1609 (void)stream_limit_offset;
1610
1611 #if DEBUG
1612 if(stream_limit_offset == 0)
1613 {
1614 log_err("impossible limit value read from the journal");
1615 journal_cjf_page_cache_read_header(data->jnl->file, page_offset, &page_header);
1616 }
1617 #endif
1618
1619 yassert(stream_limit_offset != 0);
1620 yassert(stream_limit_offset > page_offset);
1621
1622 data->available = page_header.stream_end_offset - stream_offset;
1623 data->page_next = page_header.next_page_offset;
1624
1625 if(file_pool_seek(data->file, stream_offset, SEEK_SET) < 0)
1626 {
1627 return ERRNO_ERROR;
1628 }
1629 }
1630
1631 n = MIN(n, data->available);
1632
1633 if(FAIL(ret = file_pool_readfully(data->file, buffer, n)))
1634 {
1635 return ret;
1636 }
1637
1638 data->available -= n;
1639 buffer += n;
1640 }
1641
1642 return buffer - base;
1643 }
1644
1645 static ya_result
journal_cjf_input_stream_skip(input_stream * is,u32 len)1646 journal_cjf_input_stream_skip(input_stream* is, u32 len)
1647 {
1648 u8 tmp[512];
1649
1650 journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)is->data;
1651 journal_cjf *jnl = data->jnl;
1652 log_debug("cjf: %s,%p: input: skipping %u bytes", jnl->journal_file_name, jnl->file, len);
1653
1654 while(len > 0)
1655 {
1656 ya_result ret;
1657 u32 n = MIN(len, sizeof(tmp));
1658 if(FAIL(ret = journal_cjf_input_stream_read(is, tmp, n)))
1659 {
1660 return ret;
1661 }
1662
1663 len -= n;
1664 }
1665
1666 return len;
1667 }
1668
1669 static void
journal_cjf_input_stream_close(input_stream * is)1670 journal_cjf_input_stream_close(input_stream* is)
1671 {
1672 journal_cjf_input_stream_data *data = (journal_cjf_input_stream_data*)is->data;
1673
1674 log_debug("cjf: %s,%p: input: close (%p)", data->jnl->journal_file_name, data->jnl->file, data->file);
1675 journal_cjf_readunlock(data->jnl);
1676 journal_cjf_release(data->jnl);
1677 file_pool_close(data->file);
1678 ZFREE_OBJECT(data);
1679
1680 input_stream_set_void(is);
1681 }
1682
1683 static const input_stream_vtbl journal_cjf_input_stream_vtbl =
1684 {
1685 journal_cjf_input_stream_read,
1686 journal_cjf_input_stream_skip,
1687 journal_cjf_input_stream_close,
1688 "journal_cjf_input_stream"
1689 };
1690
1691 /*
1692 * the last_soa_rr is used for IXFR transfers (it has to be a prefix & suffix to the returned stream)
1693 */
1694
1695 static ya_result
journal_cjf_get_ixfr_stream_at_serial(journal * jh,u32 serial_from,input_stream * out_input_stream,dns_resource_record * out_last_soa_rr)1696 journal_cjf_get_ixfr_stream_at_serial(journal *jh, u32 serial_from, input_stream *out_input_stream, dns_resource_record *out_last_soa_rr)
1697 {
1698 journal_cjf *jnl = (journal_cjf*)jh;
1699
1700 log_debug("cjf: %s,%p: get IXFR stream at serial %i", jnl->journal_file_name, jnl->file, serial_from);
1701
1702 journal_cjf_readlock(jnl);
1703
1704 if(serial_lt(serial_from, jnl->serial_begin) || serial_ge(serial_from, jnl->serial_end))
1705 {
1706 if(serial_from == jnl->serial_end)
1707 {
1708 log_debug("cjf: %s,%p: the journal ends at %i, returning empty stream", jnl->journal_file_name, jnl->file, serial_from);
1709 journal_cjf_readunlock(jnl);
1710 empty_input_stream_init(out_input_stream);
1711 return SUCCESS; // 0
1712 }
1713 else
1714 {
1715 log_debug("cjf: %s,%p: the journal ends at %i, returning empty stream", jnl->journal_file_name, jnl->file, serial_from);
1716 journal_cjf_readunlock(jnl);
1717 #if DEBUG
1718 logger_flush();
1719 #endif
1720 return ZDB_JOURNAL_SERIAL_OUT_OF_KNOWN_RANGE;
1721 }
1722 }
1723
1724 ya_result ret;
1725 dns_resource_record rr;
1726
1727 dns_resource_record_init(&rr);
1728
1729 // increment the reference count of the journal
1730 // lock the range in the file so it cannot be overwritten
1731 // create a stream that know where to start, where to end
1732 // it has to first send the last SOA
1733 // then to send the start
1734 // then to send the last SOA again
1735
1736 if(FAIL(ret = journal_cjf_idxt_get_page_index_from_serial(jnl, serial_from)))
1737 {
1738 journal_cjf_readunlock(jnl);
1739
1740 return ret;
1741 }
1742
1743 yassert(ret < MAX_U16);
1744
1745 u16 idxt_index = (u16)ret;
1746
1747 journal_cjf_input_stream_data *data;
1748 ZALLOC_OBJECT_OR_DIE(data, journal_cjf_input_stream_data, JCJFISDT_TAG);
1749 journal_acquire((journal*)jnl);
1750 data->jnl = jnl;
1751
1752 data->file = file_pool_open_ex(journal_file_pool, jnl->journal_file_name, O_RDONLY|O_CLOEXEC, 0660);
1753
1754 if(data->file == NULL)
1755 {
1756 // the journal doess not exist (anymore ?)
1757 ZFREE_OBJECT(data);
1758
1759 journal_cjf_readunlock(jnl);
1760 journal_cjf_release(jnl);
1761
1762 return MAKE_ERRNO_ERROR(ENOENT);
1763 }
1764
1765 data->serial_from = serial_from;
1766
1767 if(out_last_soa_rr != NULL)
1768 {
1769 yassert(jnl->last_soa_offset != 0);
1770 // read the last SOA
1771
1772 size_t from = ~0;
1773
1774 file_pool_tell(data->file, &from);
1775
1776 file_pool_seek(data->file, jnl->last_soa_offset, SEEK_SET);
1777
1778 input_stream tmp;
1779 file_pool_file_input_stream_init(&tmp, data->file);
1780 ret = dns_resource_record_read(out_last_soa_rr, &tmp);
1781 file_pool_file_input_stream_detach(&tmp);
1782
1783 file_pool_seek(data->file, from, SEEK_SET);
1784
1785 if(FAIL(ret))
1786 {
1787 journal_cjf_readunlock(jnl);
1788 journal_cjf_release(jnl);
1789
1790 log_err("cjf: %s,%p: unable to read the SOA for serial %u at position %u: %r", jnl->journal_file_name, jnl->file, serial_from, jnl->last_soa_offset, ret);
1791 ZFREE_OBJECT(data);
1792 return ret;
1793 }
1794 }
1795
1796 data->idxt_index = idxt_index;
1797 data->idxt_size = jnl->idxt.count;
1798 data->available = 0;
1799
1800 data->first_stream = TRUE;
1801
1802 out_input_stream->data = data;
1803 out_input_stream->vtbl = &journal_cjf_input_stream_vtbl;
1804
1805 return ret;
1806
1807 /*
1808 * In page_begin.file_offset, we get the first PAGE table
1809 *
1810 * That table may chain to a next PAGE, and so on and so forth
1811 * While this is happening, every stream between offsets:
1812 *
1813 * page_begin.file_offset + CJF_SECTION_INDEX_SIZE
1814 *
1815 * and
1816 *
1817 * @(page_begin.file_offset + 4)
1818 *
1819 * is to be sent by the stream
1820 *
1821 * When @(page_begin.file_offset + 4) is 0, it is the last PAGE
1822 *
1823 * Note that @(page_begin.file_offset + 4) is cached in the IDXT entries
1824 *
1825 * Every PAGE table but the last one has exactly CJF_SECTION_INDEX_SLOT_COUNT items
1826 *
1827 * jnl->page.count contains the count of items of the current (not full) PAGE
1828 * jnl->page.offset_next contains the supremum of input after the last PAGE
1829 *
1830 * This means a journal has to be fully initialised before being read (it was not the case for an IX file)
1831 *
1832 * The content of the PAGE itself is not required. Only the DNS part matters.
1833 *
1834 * All this also means a journal has to be flushed for its DNS on disk (since the file has to be opened separately because a cloned fd shares the file pointer)
1835 *
1836 * A range-locking mechanism is clearly needed. It should only be capable of locking up to two ranges (covers all cases).
1837 *
1838 * So here, in summary, return a stream that is linked to the journal
1839 *
1840 * It will start at offset:
1841 *
1842 * idxt[ret].file_offset + CJF_SECTION_INDEX_SIZE
1843 *
1844 * until:
1845 *
1846 * idxt[ret + 1].file_offset or page.offset_next
1847 *
1848 * and continue that way for every ret < idxt.count
1849 *
1850 */
1851 }
1852
1853 static ya_result
journal_cjf_get_first_serial(journal * jh,u32 * serial)1854 journal_cjf_get_first_serial(journal *jh, u32 *serial)
1855 {
1856 ya_result ret = BUFFER_WOULD_OVERFLOW;
1857 journal_cjf *jnl = (journal_cjf*)jh;
1858
1859 journal_cjf_readlock(jnl);
1860
1861 u32 value = jnl->serial_begin;
1862
1863 if(serial != NULL)
1864 {
1865 *serial = value;
1866 ret = SUCCESS;
1867 }
1868
1869 journal_cjf_readunlock(jnl);
1870
1871 log_debug("cjf: %s,%p: get first serial: %i", jnl->journal_file_name, jnl->file, value);
1872
1873 return ret;
1874 }
1875
1876 static ya_result
journal_cjf_get_last_serial(journal * jh,u32 * serial)1877 journal_cjf_get_last_serial(journal *jh, u32 *serial)
1878 {
1879 ya_result ret = BUFFER_WOULD_OVERFLOW;
1880 journal_cjf *jnl = (journal_cjf*)jh;
1881
1882 journal_cjf_readlock(jnl);
1883
1884 u32 value = jnl->serial_end;
1885
1886 if(serial != NULL)
1887 {
1888 *serial = value;
1889 ret = SUCCESS;
1890 }
1891
1892 log_debug("cjf: %s,%p: get last serial: %i", jnl->journal_file_name, jnl->file, value);
1893
1894 journal_cjf_readunlock(jnl);
1895
1896 return ret;
1897 }
1898
1899 static ya_result
journal_cjf_get_serial_range(journal * jh,u32 * serial_start,u32 * serial_end)1900 journal_cjf_get_serial_range(journal *jh, u32 *serial_start, u32 *serial_end)
1901 {
1902 journal_cjf *jnl = (journal_cjf*)jh;
1903
1904 journal_cjf_readlock(jnl);
1905
1906 if(serial_start != NULL)
1907 {
1908 *serial_start = jnl->serial_begin;
1909 }
1910 if(serial_end != NULL)
1911 {
1912 *serial_end = jnl->serial_end;
1913 }
1914
1915 journal_cjf_readunlock(jnl);
1916
1917 return SUCCESS;
1918 }
1919
1920 static ya_result
journal_cjf_truncate_to_size(journal * jh,u32 size_)1921 journal_cjf_truncate_to_size(journal *jh, u32 size_)
1922 {
1923 journal_cjf *jnl = (journal_cjf*)jh;
1924
1925 if(size_ == 0)
1926 {
1927 journal_cjf_writelock(jnl);
1928
1929 log_debug("cjf: %s,%p: truncate to size 0", jnl->journal_file_name, jnl->file);
1930
1931 if(jnl->file == NULL)
1932 {
1933 journal_cjf_page_cache_close(jnl->file);
1934 file_pool_close(jnl->file);
1935 jnl->file = NULL;
1936 }
1937 file_pool_unlink_from_pool_and_filename(journal_file_pool, jnl->journal_file_name);
1938
1939 jnl->idxt.dirty = FALSE;
1940 journal_cjf_idxt_destroy(jnl);
1941
1942 jnl->file_maximum_size = MAX_U32;
1943 if(jnl->zone != NULL)
1944 {
1945 jnl->file_maximum_size = jnl->zone->wire_size >> 1;
1946 zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
1947 }
1948
1949 jnl->file = NULL;
1950
1951 jnl->last_page.file_offset = CJF_HEADER_SIZE;
1952 jnl->last_page.count = 0;
1953 jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
1954 jnl->last_page.serial_start = 0;
1955 jnl->last_page.serial_end = 0;
1956 jnl->last_page.records_limit = CJF_HEADER_SIZE + CJF_SECTION_INDEX_SIZE;
1957 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
1958
1959 jnl->serial_begin = 0;
1960 jnl->serial_end = 0;
1961 jnl->first_page_offset = CJF_HEADER_SIZE;
1962 jnl->page_table_file_offset = 0;
1963 jnl->last_soa_offset = 0;
1964 //jnl->file_maximum_size = MAX_U32;
1965
1966 //jnl->mtx.owner = LOCK_NONE;
1967 //jnl->mtx.count = 0;
1968
1969 jnl->flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN;
1970
1971 jnl->last_page.records_limit = jnl->last_page.file_offset + CJF_SECTION_INDEX_SIZE;
1972 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
1973
1974 //jnl->journal_file_name = strdup(filename);
1975
1976 journal_cjf_writeunlock(jnl);
1977
1978 return SUCCESS;
1979 }
1980 else
1981 {
1982 log_err("cjf: %s,%p: truncate to size != 0 not implemented", jnl->journal_file_name, jnl->file);
1983
1984 return ZDB_JOURNAL_FEATURE_NOT_SUPPORTED;
1985 }
1986 }
1987
1988 static ya_result
journal_cjf_truncate_to_serial(journal * jh,u32 serial_)1989 journal_cjf_truncate_to_serial(journal *jh, u32 serial_)
1990 {
1991 journal_cjf *jnl = (journal_cjf*)jh;
1992 (void)serial_;
1993 journal_cjf_readlock(jnl);
1994 log_err("cjf: %s,%p: truncate to serial not implemented", jnl->journal_file_name, jnl->file);
1995 journal_cjf_readunlock(jnl);
1996
1997 return ZDB_JOURNAL_FEATURE_NOT_SUPPORTED;
1998 }
1999
2000 /**
2001 *
2002 * @param jnl
2003 * @return
2004 */
2005
2006 static ya_result
journal_cjf_reopen(journal * jh)2007 journal_cjf_reopen(journal *jh)
2008 {
2009 #if 0 /* fix */
2010 #else
2011 return SUCCESS;
2012 #endif
2013 }
2014
2015 static void
journal_cjf_flush(journal * jh)2016 journal_cjf_flush(journal *jh)
2017 {
2018 journal_cjf *jnl = (journal_cjf*)jh;
2019
2020 log_debug("cjf: %s,%p: flush", jnl->journal_file_name, jnl->file);
2021
2022 journal_cjf_writelock(jnl);
2023
2024 #if ZDB_ZONE_HAS_JNL_REFERENCE
2025 zdb_zone *zone;
2026 if((zone = (zdb_zone*)jnl->zone) != NULL)
2027 {
2028 yassert(zone->journal == jh);
2029 zone->journal = NULL;
2030 }
2031 #endif
2032
2033 log_debug3("cjf: %s,%p: flushing to file", jnl->journal_file_name, jnl->file);
2034
2035 log_debug3("cjf: %s,%p: flushing to file: flushing PAGE cache", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2036 journal_cjf_page_cache_flush(jnl->file);
2037 log_debug3("cjf: %s,%p: flushing to file: flushing IDXT", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2038 journal_cjf_idxt_flush(jnl);
2039 log_debug3("cjf: %s,%p: flushing to file: flushing header", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2040 journal_cjf_header_flush(jnl);
2041
2042 journal_cjf_writeunlock(jnl);
2043 }
2044
2045 static ya_result
journal_cjf_close(journal * jh)2046 journal_cjf_close(journal *jh)
2047 {
2048 journal_cjf *jnl = (journal_cjf*)jh;
2049
2050 log_debug("cjf: %s,%p: close", jnl->journal_file_name, jnl->file);
2051
2052 journal_cjf_writelock(jnl);
2053
2054 #if ZDB_ZONE_HAS_JNL_REFERENCE
2055 zdb_zone *zone;
2056 if((zone = (zdb_zone*)jnl->zone) != NULL)
2057 {
2058 yassert(zone->journal == jh);
2059 zone->journal = NULL;
2060 }
2061 #endif
2062
2063 log_debug3("cjf: %s,%p: closing file", jnl->journal_file_name, jnl->file);
2064
2065 if(jnl->file != NULL)
2066 {
2067 log_debug3("cjf: %s,%p: closing file: closing PAGE cache", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2068 journal_cjf_page_cache_close(jnl->file);
2069 log_debug3("cjf: %s,%p: closing file: flushing IDXT", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2070 journal_cjf_idxt_flush(jnl);
2071 log_debug3("cjf: %s,%p: closing file: flushing header", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2072 journal_cjf_header_flush(jnl);
2073 log_debug3("cjf: %s,%p: closing file: closing file", jnl->journal_file_name, jnl->file, jnl->journal_file_name);
2074
2075 journal_cjf_idxt_destroy(jnl);
2076
2077 if(jnl->zone != NULL)
2078 {
2079 log_info("zone: %{dnsname}: closing journal file '%s'", jnl->origin, jnl->journal_file_name);
2080 }
2081 else
2082 {
2083 log_info("zone: <notset>: closing journal file '%s'", jnl->journal_file_name);
2084 }
2085
2086 file_pool_close(jnl->file);
2087 jnl->file = NULL;
2088 }
2089
2090 journal_cjf_writeunlock(jnl);
2091
2092 return SUCCESS;
2093 }
2094
2095 static void
journal_cjf_log_dump(journal * jh)2096 journal_cjf_log_dump(journal *jh)
2097 {
2098 journal_cjf *jnl = (journal_cjf*)jh;
2099 journal_cjf_readlock(jnl);
2100 log_debug("cjf: %s,%p: [%u; %u] '%s' (%i) lck=%i rc=%i", jnl->journal_file_name, jnl->file, jnl->serial_begin, jnl->serial_end, jnl->journal_file_name, jnl->file, jnl->mtx.owner, jnl->mtx.count);
2101 journal_cjf_readunlock(jnl);
2102 }
2103
2104 static ya_result
journal_cjf_get_domain(journal * jh,u8 * out_domain)2105 journal_cjf_get_domain(journal *jh, u8 *out_domain)
2106 {
2107 journal_cjf *jnl = (journal_cjf*)jh;
2108
2109 // don't: journal_cjf_readlock(jnl); as the field is constant until the destruction of the journal
2110
2111 dnsname_copy(out_domain, jnl->origin);
2112 return SUCCESS;
2113 }
2114
2115 /**
2116 * Links a zdb_zone and a journal
2117 *
2118 * @param jh
2119 * @param zone
2120 */
2121
2122 static void
journal_cjf_link_zone(journal * jh,zdb_zone * zone)2123 journal_cjf_link_zone(journal *jh, zdb_zone *zone)
2124 {
2125 journal_cjf *jnl = (journal_cjf*)jh;
2126
2127 journal_cjf_writelock(jnl);
2128
2129 if(jnl->zone != zone)
2130 {
2131 jnl->file_maximum_size = MAX_U32;
2132
2133 #if !ZDB_ZONE_HAS_JNL_REFERENCE
2134 if(jnl->zone != NULL)
2135 {
2136 log_debug("cjf: %s,%p: unlinking zone %{dnsname},%p", jnl->journal_file_name, jnl->file, jnl->zone->origin, jnl->zone);
2137
2138 zdb_zone_release((zdb_zone*)jnl->zone); //jnl->zone = NULL;
2139 }
2140
2141 if(zone != NULL)
2142 {
2143 zdb_zone_acquire(zone);
2144
2145 log_debug("cjf: %s,%p: linking to zone %{dnsname},%p", jnl->journal_file_name, jnl->file, zone->origin, zone);
2146
2147 jnl->file_maximum_size = zone->wire_size >> 1;
2148 }
2149 #endif
2150 jnl->zone = zone;
2151
2152 zdb_zone_info_get_zone_max_journal_size(jnl->origin, &jnl->file_maximum_size);
2153
2154 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
2155 }
2156
2157 journal_cjf_writeunlock(jnl);
2158 }
2159
2160 static void
journal_cjf_destroy(journal * jh)2161 journal_cjf_destroy(journal *jh)
2162 {
2163 journal_cjf *jnl = (journal_cjf*)jh;
2164
2165 yassert(jnl->rc == 0);
2166
2167 log_debug("cjf: %s,%p: destroy", jnl->journal_file_name, jnl->file);
2168
2169 journal_cjf_link_zone(jh, NULL);
2170
2171 shared_group_mutex_destroy(&jnl->mtx);
2172 free(jnl->origin);
2173 free(jnl->journal_file_name);
2174
2175 #if DEBUG
2176 memset(jnl, 0xfe, sizeof(journal_cjf));
2177 jnl->mru = FALSE;
2178 #endif
2179
2180 ZFREE_OBJECT(jnl);
2181 }
2182
2183 static const u8 *
journal_cjf_get_domain_const(journal * jh)2184 journal_cjf_get_domain_const(journal *jh)
2185 {
2186 journal_cjf *jnl = (journal_cjf*)jh;
2187 return jnl->origin;
2188 }
2189
2190 /*******************************************************************************
2191 *
2192 * vtbl handling functions
2193 *
2194 ******************************************************************************/
2195
2196 struct journal_vtbl journal_cjf_vtbl =
2197 {
2198 journal_cjf_get_format_name,
2199 journal_cjf_get_format_version,
2200 journal_cjf_reopen,
2201 journal_cjf_flush,
2202 journal_cjf_close,
2203 journal_cjf_append_ixfr_stream,
2204 journal_cjf_get_ixfr_stream_at_serial,
2205 journal_cjf_get_first_serial,
2206 journal_cjf_get_last_serial,
2207 journal_cjf_get_serial_range,
2208 journal_cjf_truncate_to_size,
2209 journal_cjf_truncate_to_serial,
2210 journal_cjf_log_dump,
2211 journal_cjf_get_domain,
2212 journal_cjf_destroy,
2213 journal_cjf_link_zone,
2214 journal_cjf_get_domain_const,
2215 JOURNAL_CLASS_NAME
2216 };
2217
2218 ya_result
journal_cjf_load_index_table(journal * jh)2219 journal_cjf_load_index_table(journal *jh)
2220 {
2221 journal_cjf *jnl = (journal_cjf*)jh;
2222
2223 // check if the index table (referencing all indexes)
2224
2225 if(jnl->page_table_file_offset == 0)
2226 {
2227 // the table does not exist or is corrupted
2228 // it has to be read again
2229 // from first_index_offset the index list has to be followed to recreate it
2230 // if such table only contains one entry, it could probably be ignored in most cases
2231 }
2232 else
2233 {
2234 // seek and load the table
2235 }
2236
2237 return -1;
2238 }
2239
2240 static journal_cjf*
journal_cjf_alloc_default(const u8 * origin,const char * filename)2241 journal_cjf_alloc_default(const u8 *origin, const char *filename)
2242 {
2243 journal_cjf *jnl;
2244 ZALLOC_OBJECT_OR_DIE(jnl, journal_cjf, JRNLCJF_TAG);
2245 ZEROMEMORY(jnl, sizeof(journal_cjf));
2246 jnl->vtbl = &journal_cjf_vtbl;
2247 jnl->mru_node.data = jnl;
2248 jnl->file = NULL;
2249 jnl->file_maximum_size = MAX_U32;
2250 jnl->first_page_offset = CJF_HEADER_SIZE;
2251 jnl->origin = dnsname_dup(origin);
2252 jnl->journal_file_name = strdup(filename);
2253 jnl->last_page.file_offset = CJF_HEADER_SIZE;
2254 jnl->last_page.size = CJF_SECTION_INDEX_SLOT_COUNT;
2255 jnl->last_page.records_limit = CJF_HEADER_SIZE + CJF_SECTION_INDEX_SIZE;
2256 jnl->last_page.file_offset_limit = jnl->file_maximum_size;
2257 jnl->flags = JOURNAL_CFJ_FLAGS_MY_ENDIAN|JOURNAL_CFJ_FLAGS_UNINITIALISED;
2258 shared_group_mutex_init(&jnl->mtx, &journal_shared_mtx, "journal-cjf");
2259 return jnl;
2260 }
2261 /**
2262 * The caller guarantees not to call this on an already opened journal
2263 *
2264 * Should not be called directly (only by journal_* functions.
2265 *
2266 * Opens or create a journal handling structure.
2267 * If the journal did not exist, the structure is returned without a file opened
2268 *
2269 * @param jh
2270 * @param origin
2271 * @param workingdir
2272 * @param create
2273 *
2274 * @return
2275 */
2276
2277 ya_result
journal_cjf_open_file(journal ** jhp,const char * filename,const u8 * origin,bool create)2278 journal_cjf_open_file(journal **jhp, const char *filename, const u8* origin, bool create)
2279 {
2280 // CFJ_PAGE_CACHE ->
2281 if(!journal_initialized)
2282 {
2283 journal_cjf_page_cache_init();
2284
2285 shared_group_shared_mutex_init(&journal_shared_mtx);
2286
2287 journal_file_pool = file_pool_init("journal-file-pool", 256);
2288
2289 journal_initialized = TRUE;
2290 }
2291
2292 journal_cjf *jnl = NULL;
2293 ya_result ret;
2294
2295 if(file_exists(filename) || create)
2296 {
2297 // instantiate and open the journal
2298
2299 ret = journal_cjf_init_from_file(&jnl, origin, filename, create);
2300
2301 if(ISOK(ret))
2302 {
2303 yassert(jnl != NULL); // to help scan-build
2304
2305 if(!((jnl->serial_begin == 0) && (jnl->serial_begin == jnl->serial_end))) // scan-build false-positive : if ISOK(ret) => jnl != NULL
2306 {
2307 journal_cjf_load_idxt(jnl);
2308 }
2309 }
2310 else
2311 {
2312 if(create)
2313 {
2314 log_err("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
2315 }
2316 else
2317 {
2318 log_debug("cjf: %{dnsname}: failed to open %s: %r", origin, filename, ret);
2319 }
2320
2321 if(jnl != NULL)
2322 {
2323 journal_cjf_destroy((journal*)jnl);
2324 #if DEBUG
2325 log_debug("cjf: %{dnsname}: journal file cannot be opened/created", origin);
2326 #endif
2327 }
2328
2329 return ZDB_ERROR_ICMTL_NOTFOUND;
2330 }
2331
2332 #if DEBUG
2333 log_debug("cjf: %{dnsname}: journal opened", origin);
2334 #endif
2335 *jhp = (journal*)jnl;
2336
2337 return SUCCESS;
2338 }
2339 else
2340 {
2341 #if DEBUG
2342 log_debug("cjf: %{dnsname}: journal file not found", origin);
2343 #endif
2344 return ZDB_ERROR_ICMTL_NOTFOUND;
2345 }
2346 }
2347
2348 /**
2349 * The caller guarantees not to call this on an already opened journal
2350 *
2351 * Should not be called directly (only by journal_* functions.
2352 *
2353 * Opens or create a journal handling structure.
2354 * If the journal did not exist, the structure is returned without a file opened
2355 *
2356 * @param jh
2357 * @param origin
2358 * @param workingdir
2359 * @param create
2360 *
2361 * @return
2362 */
2363
2364 ya_result
journal_cjf_open(journal ** jhp,const u8 * origin,const char * workingdir,bool create)2365 journal_cjf_open(journal **jhp, const u8* origin, const char *workingdir, bool create)
2366 {
2367 // CFJ_PAGE_CACHE <-
2368
2369 ya_result ret;
2370
2371 *jhp = NULL;
2372
2373 // generate the file name
2374
2375 char filename[PATH_MAX];
2376
2377 if((jhp == NULL) || (origin == NULL) || (workingdir == NULL))
2378 {
2379 return ZDB_JOURNAL_WRONG_PARAMETERS;
2380 }
2381
2382 #if DEBUG
2383 log_debug("cjf: trying to open journal for %{dnsname} in '%s'", origin, workingdir);
2384 #endif
2385
2386 /* get the soa of the loaded zone */
2387
2388 if(FAIL(ret = snformat(filename, sizeof(filename), CJF_WIRE_FILE_FORMAT, workingdir, origin)))
2389 {
2390 #if DEBUG
2391 log_debug("cjf: %{dnsname}: journal file name is too long", origin);
2392 #endif
2393 return ret;
2394 }
2395
2396 ret = journal_cjf_open_file(jhp, filename, origin, create);
2397
2398 return ret;
2399 }
2400
2401 void
journal_cjf_finalize()2402 journal_cjf_finalize()
2403 {
2404 journal_cjf_page_cache_finalize();
2405 file_pool_finalize(journal_file_pool);
2406 }
2407
2408 #endif
2409
2410 /** @} */
2411