1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup
36 * @ingroup
37 * @brief
38 *
39 *
40 *
41 * @{
42 *
43 *----------------------------------------------------------------------------*/
44
45 /*------------------------------------------------------------------------------
46 * GLOBAL VARIABLES */
47
48 /*------------------------------------------------------------------------------
49 * STATIC PROTOTYPES */
50
51 /*------------------------------------------------------------------------------
52 * FUNCTIONS */
53
54 /** @brief Function ...
55 *
56 * ...
57 *
58 * @param ...
59 *
60 * @retval OK
61 * @retval NOK
62 */
63
64 #define ZDB_JOURNAL_CODE 1
65
66 #define JOURNAL_CJF_BASE 1
67
68 #include "dnsdb/dnsdb-config.h"
69 #include "dnsdb/journal-cjf-page-cache.h"
70 #include "dnsdb/journal-cjf-common.h"
71 #include "dnsdb/journal-cjf.h"
72
73 #include <sys/types.h>
74 #include <sys/stat.h>
75 #include <unistd.h>
76 #include <dirent.h>
77 #include <fcntl.h>
78
79 #include <dnscore/limited_input_stream.h>
80 #include <dnscore/mutex.h>
81 #include <dnscore/serial.h>
82 #include <dnscore/dns_resource_record.h>
83
84 #include <dnscore/ptr_set.h>
85 #include <dnscore/fdtools.h>
86
87 #include <dnscore/ptr_set.h>
88 #include <dnscore/u64_set.h>
89 #include <dnscore/list-dl.h>
90 #include <dnscore/list-sl.h>
91
92 #include <dnscore/ctrl-rfc.h>
93
94 #include "dnsdb/zdb_error.h"
95 #include "dnsdb/zdb_utils.h"
96 #include "dnsdb/journal.h"
97 #include "dnsdb/zdb_types.h"
98 #include "dnsdb/xfr_copy.h"
99 #include "dnsdb/zdb-zone-path-provider.h"
100
101 #if JOURNAL_CJF_ENABLED
102
103 #define DEBUG_JOURNAL 0
104 #if !DEBUG
105 #undef DEBUG_JOURNAL
106 #define DEBUG_JOURNAL 0
107 #endif
108
109 #define LOCK_NONE 0
110 #define LOCK_READ 1
111 #define LOCK_WRITE 2
112
113 #define CJF_EXT "cjf"
114 #define CJF_EXT_STRLEN 3
115
116 #define SOA_RDATA_SIZE_MAX 532
117
118 #define DO_SYNC 1
119
120 extern logger_handle* g_database_logger;
121 #define MODULE_MSG_HANDLE g_database_logger
122
123 /*
124 * Contains the journal (almost: not the matching start and end SOA)
125 */
126
127
128 #define CJF_SECTION_INDEX_SLOT_HEAD 16
129 #define CJF_SECTION_INDEX_SLOT_SIZE 8
130 #define CJF_SECTION_INDEX_SLOT_COUNT 510
131 #define CJF_SECTION_INDEX_SIZE (CJF_SECTION_INDEX_SLOT_HEAD + CJF_SECTION_INDEX_SLOT_SIZE * CJF_SECTION_INDEX_SLOT_COUNT) // 4KB
132
133 #define CJF_PAGE_SIZE_IN_BYTE (CJF_SECTION_INDEX_SLOT_HEAD + (CJF_SECTION_INDEX_SLOT_COUNT * CJF_SECTION_INDEX_SLOT_SIZE))
134 #define CJF_PAGE_ARBITRARY_UPDATE_SIZE 512
135
136 #define CJF_SECTION_INDEX_SLOT_HEAD_SLOT (CJF_SECTION_INDEX_SLOT_HEAD / CJF_SECTION_INDEX_SLOT_SIZE)
137
138 //#define log_cjf_page_debug log_debug5
139 #define log_cjf_page_debug log_debug
140
141 /*
142 * PAGE
143 *
144 * Serial Number Stream Offset
145 *
146 * The table of serials streams (IXFR) and their offset
147 * The value stored is of the serial ending the IXFR
148 */
149
150 #define PAGE_INITIALIZER {CJF_PAGE_MAGIC, 0, 0, CJF_SECTION_INDEX_SLOT_COUNT, 0, 0}
151
152 static bool empty_page_tbl_header_and_zeroes_initialised = FALSE;
153 static u8 empty_page_tbl_header_and_zeroes[CJF_SECTION_INDEX_SIZE];
154 static u32 journal_cfj_page_mru_size = 64;
155
156 //
157
158 struct journal_cjf_page_tbl
159 {
160 journal_cjf_page_tbl_header hdr;
161 journal_cjf_page_tbl_item items[CJF_SECTION_INDEX_SLOT_COUNT];
162 };
163
164 // PAGE are all over the place, going back to write into or read from one is a drag
165 // the first idea would be to have the current PAGE along with the journal, and it would solve MOST problems
166 // but PAGE may be needed at more than one place.
167 // So the idea is to have them cached:
168
169 // An PAGE cache entry is:
170 // _ maybe a reference count
171 // _ a file descriptor
172 // _ a file position
173 // _ a dirty flag, or a last_written offset
174 // _ maybe a count
175 // _ a 4K buffer
176 // _ maybe an MRU entry ?
177 // _ maybe a mutex
178 // the key ...
179 //
180 // one should be able to
181 // _ load an PAGE from the disk
182 // _ flush an PAGE back to the disk
183 // _ flush all PAGE linked to a file descriptor to the disk (KEY!)
184 // _ update the PAGE of a given position in a file to the disk (KEY!)
185 //
186 // there should not be a lot of PAGE per file descriptor, and PAGE should be flushed back at key times
187 // ie: closing the file descriptor, too many PAGE flush the least used ones, ...
188 //
189 // they should be pooled, I think
190
191 #define JCJFPCI_TAG 0x494350464a434a
192
193 struct journal_cjf_page_cache_item
194 {
195 u64 file_offset;
196 journal_cjf_page_tbl_item *buffer;
197 file_pool_file_t file;
198 s16 first_written_entry;
199 s16 last_written_entry; // set to the end means not dirty
200 };
201
202 typedef struct journal_cjf_page_cache_item journal_cjf_page_cache_item;
203
204 // fd => list_dl(page_cache_item)
205
206 static ptr_set page_cache_item_by_file = PTR_SET_PTR_EMPTY;
207 static list_dl_s page_cache_mru = {{NULL, NULL}, {NULL, NULL}, 0};
208 static group_mutex_t page_cache_mtx = GROUP_MUTEX_INITIALIZER;
209
210 static void journal_cjf_page_cache_free(journal_cjf_page_cache_item *page_cache);
211
212 void
journal_cjf_page_cache_init()213 journal_cjf_page_cache_init()
214 {
215 if(empty_page_tbl_header_and_zeroes_initialised)
216 {
217 return;
218 }
219
220 journal_cjf_page_tbl_header head = PAGE_INITIALIZER;
221 ZEROMEMORY(empty_page_tbl_header_and_zeroes, sizeof(empty_page_tbl_header_and_zeroes));
222 memcpy(empty_page_tbl_header_and_zeroes, &head, CJF_SECTION_INDEX_SLOT_HEAD);
223
224 list_dl_init(&page_cache_mru);
225
226 empty_page_tbl_header_and_zeroes_initialised = TRUE;
227 }
228
229 static void
journal_cjf_page_cache_remove_from_mru(journal_cjf_page_cache_item * page_cache)230 journal_cjf_page_cache_remove_from_mru(journal_cjf_page_cache_item *page_cache)
231 {
232 if(list_dl_size(&page_cache_mru) > 0)
233 {
234 list_dl_remove(&page_cache_mru, page_cache);
235 }
236 }
237
238 static void
journal_cjf_page_cache_add_to_mru(journal_cjf_page_cache_item * page_cache)239 journal_cjf_page_cache_add_to_mru(journal_cjf_page_cache_item *page_cache)
240 {
241 list_dl_insert(&page_cache_mru, page_cache);
242 }
243
244 /*
245 void
246 journal_cjf_page_cache_finalize()
247 {
248 }
249 */
250
251 /**
252 * called undirty because clear and clean are too similar
253 *
254 * @param page_cache
255 */
256
257 static void
journal_cjf_page_cache_item_undirty(journal_cjf_page_cache_item * page_cache)258 journal_cjf_page_cache_item_undirty(journal_cjf_page_cache_item *page_cache)
259 {
260 page_cache->first_written_entry = (1 + CJF_SECTION_INDEX_SLOT_COUNT);
261 page_cache->last_written_entry = -1;
262 }
263
264 static void
journal_cjf_page_cache_item_flush_internal(journal_cjf_page_cache_item * page_cache)265 journal_cjf_page_cache_item_flush_internal(journal_cjf_page_cache_item *page_cache)
266 {
267 file_pool_file_t file = page_cache->file;
268
269 // at file_offset, write from first to last entries
270
271 off_t first_offset = page_cache->file_offset + (page_cache->first_written_entry * CJF_SECTION_INDEX_SLOT_SIZE);
272 size_t size = ((page_cache->last_written_entry - page_cache->first_written_entry) + 1) * CJF_SECTION_INDEX_SLOT_SIZE;
273
274 log_cjf_page_debug("cjf: %s: flush page @%lli=%llx size=%i", file_pool_filename(file), first_offset, first_offset, size);
275
276 for(;;)
277 {
278 file_pool_seek(file, first_offset, SEEK_SET);
279
280 ya_result ret = file_pool_writefully(file, &page_cache->buffer[page_cache->first_written_entry], size);
281
282 if(ret == (s32)size)
283 {
284 break;
285 }
286
287 // should no be reached, but if an issue occur, better not hammer the logs
288
289 log_err("cjf: %s: flush page @%lli=%llx size=%i failed with: %r", file_pool_filename(file), first_offset, first_offset, size, ret);
290
291 sleep(1);
292 }
293
294 // mark the entry as not being used
295
296 journal_cjf_page_cache_item_undirty(page_cache);
297 }
298
299 /**
300 * @param page_cache
301 * @return
302 */
303
304 static void
journal_cjf_page_cache_item_flush(journal_cjf_page_cache_item * page_cache)305 journal_cjf_page_cache_item_flush(journal_cjf_page_cache_item *page_cache)
306 {
307 yassert(group_mutex_islocked(&page_cache_mtx));
308
309 if(page_cache != NULL)
310 {
311 if(page_cache->first_written_entry <= page_cache->last_written_entry)
312 {
313 file_pool_file_t file = page_cache->file;
314 size_t here;
315
316 file_pool_tell(file, &here);
317
318 journal_cjf_page_cache_item_flush_internal(page_cache);
319
320 file_pool_seek(file, (ssize_t)here, SEEK_SET);
321 }
322 }
323 else
324 {
325 log_err("cjf: journal_cjf_page_cache_flush_item(NULL)");
326 }
327 }
328
329 static void
journal_cjf_page_cache_cull()330 journal_cjf_page_cache_cull()
331 {
332 yassert(group_mutex_islocked(&page_cache_mtx));
333
334 // culls a cache entry at the end of the MRU
335
336 log_cjf_page_debug("cjf: cull pages");
337
338 while(list_dl_size(&page_cache_mru) > journal_cfj_page_mru_size)
339 {
340 // get the tail one
341 journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)list_dl_remove_last(&page_cache_mru);
342 // flush it
343 journal_cjf_page_cache_item_flush(page_cache);
344 // free it
345 }
346
347 log_cjf_page_debug("cjf: cull pages done");
348 }
349
350 static void
journal_cjf_page_mru_clear()351 journal_cjf_page_mru_clear()
352 {
353 yassert(group_mutex_islocked(&page_cache_mtx));
354
355 // culls a cache entry at the end of the MRU
356
357 log_cjf_page_debug("cjf: clear pages");
358
359 while(list_dl_size(&page_cache_mru) > 0)
360 {
361 // get the tail one
362 journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)list_dl_remove_last(&page_cache_mru);
363 // flush it
364 journal_cjf_page_cache_item_flush(page_cache);
365 // free it
366 journal_cjf_page_cache_free(page_cache);
367 }
368
369 log_cjf_page_debug("cjf: clear pages done");
370 }
371
372 static journal_cjf_page_cache_item *
journal_cjf_page_cache_new(file_pool_file_t file,u32 file_offset)373 journal_cjf_page_cache_new(file_pool_file_t file, u32 file_offset)
374 {
375 journal_cjf_page_cache_item *page_cache;
376
377 log_cjf_page_debug("cjf: %s: new page cache for offset %i", file_pool_filename(file), file_offset);
378
379 ZALLOC_OBJECT_OR_DIE(page_cache, journal_cjf_page_cache_item, JCJFPCI_TAG);
380 page_cache->file_offset = file_offset;
381 MALLOC_OR_DIE(journal_cjf_page_tbl_item*, page_cache->buffer, CJF_PAGE_SIZE_IN_BYTE , JCJFTI_TAG);
382 page_cache->file = file;
383 journal_cjf_page_cache_item_undirty(page_cache);
384
385 #if DEBUG
386 memset(page_cache->buffer, 0xfe, CJF_PAGE_SIZE_IN_BYTE);
387 #endif
388
389 return page_cache;
390 }
391
392 static void
journal_cjf_page_cache_free(journal_cjf_page_cache_item * page_cache)393 journal_cjf_page_cache_free(journal_cjf_page_cache_item *page_cache)
394 {
395 #if DEBUG
396 memset(page_cache->buffer, 0xfd, CJF_PAGE_SIZE_IN_BYTE);
397 #endif
398
399 free(page_cache->buffer);
400 ZFREE_OBJECT(page_cache);
401 }
402
403 static inline u64_set*
journal_cjf_page_cache_set_from_file(file_pool_file_t file)404 journal_cjf_page_cache_set_from_file(file_pool_file_t file)
405 {
406 yassert(group_mutex_islocked(&page_cache_mtx));
407
408 ptr_node *file_node = ptr_set_find(&page_cache_item_by_file, file);
409 if(file_node != NULL)
410 {
411 return (u64_set*)&file_node->value;
412 }
413 else
414 {
415 log_warn("cjf: %s: file is not cached", file_pool_filename(file));
416 return NULL;
417 }
418 }
419
420 static inline journal_cjf_page_cache_item*
journal_cjf_page_cache_from_set(u64_set * page_cache_set,u32 file_offset)421 journal_cjf_page_cache_from_set(u64_set* page_cache_set, u32 file_offset)
422 {
423 u64_node *file_offset_node = u64_set_find(page_cache_set, file_offset);
424
425 if(file_offset_node != NULL)
426 {
427 return (journal_cjf_page_cache_item*)file_offset_node->value;
428 }
429 else
430 {
431 log_err("cjf: page is not cached");
432 }
433
434 return NULL;
435 }
436
437 static inline journal_cjf_page_cache_item*
journal_cjf_page_cache_from_file(file_pool_file_t file,u32 file_offset)438 journal_cjf_page_cache_from_file(file_pool_file_t file, u32 file_offset)
439 {
440 u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
441
442 if(page_cache_set != NULL)
443 {
444 journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_set(page_cache_set, file_offset);
445
446 return page_cache;
447 }
448
449 return NULL;
450 }
451
452 static void
journal_cjf_page_cache_delete_from_file_and_offset(const file_pool_file_t file,u32 file_offset)453 journal_cjf_page_cache_delete_from_file_and_offset(const file_pool_file_t file, u32 file_offset)
454 {
455 yassert(group_mutex_islocked(&page_cache_mtx));
456
457 log_cjf_page_debug("cjf: %s: dropping page at offset %i", file_pool_filename(file), file_offset);
458
459 u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
460
461 if(page_cache_set != NULL)
462 {
463 // get the PAGE cache at the file_offset
464
465 journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_set(page_cache_set, file_offset);
466
467 if(page_cache != NULL)
468 {
469 u64_set_delete(page_cache_set, file_offset);
470
471 journal_cjf_page_cache_item_flush(page_cache);
472
473 journal_cjf_page_cache_remove_from_mru(page_cache);
474
475 journal_cjf_page_cache_free(page_cache);
476 }
477 }
478 }
479
480 static journal_cjf_page_cache_item*
journal_cjf_page_cache_get_for_rw(file_pool_file_t file,u64 file_offset)481 journal_cjf_page_cache_get_for_rw(file_pool_file_t file, u64 file_offset)
482 {
483 // get or create a node for the fd
484
485 ptr_node *file_node = ptr_set_insert(&page_cache_item_by_file, file);
486
487 // make some room, if needed
488
489 journal_cjf_page_cache_cull();
490
491 // get or create an PAGE cache at the file_offset
492
493 u64_node *file_offset_node = u64_set_insert((u64_set*)&file_node->value, file_offset);
494
495 journal_cjf_page_cache_item *page_cache;
496
497 if(file_offset_node->value != NULL)
498 {
499 // already got that one
500
501 page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
502 }
503 else
504 {
505 // have to create it
506 page_cache = journal_cjf_page_cache_new(file, file_offset);
507
508 // if the file is big enough: load it
509
510 size_t the_file_size;
511
512 if(ISOK(file_pool_get_size(file, &the_file_size)))
513 {
514 if(file_offset + CJF_PAGE_SIZE_IN_BYTE <= the_file_size)
515 {
516 size_t here = ~0ULL;
517 file_pool_tell(file, &here);
518 yassert(here != ~0ULL);
519
520 #ifndef NDEBUG
521 ssize_t there =
522 #endif
523 file_pool_seek(file, file_offset, SEEK_SET);
524 yassert(there == (ssize_t)file_offset);
525 // it is a READ, because to write the cache, it must first be loaded
526 ssize_t size = file_pool_readfully(file, page_cache->buffer, CJF_PAGE_SIZE_IN_BYTE);
527 #if DEBUG
528 log_memdump_ex(g_database_logger, MSG_DEBUG6, page_cache->buffer, size, 32, OSPRINT_DUMP_ADDRESS|OSPRINT_DUMP_HEX);
529 #endif
530 yassert(size == CJF_PAGE_SIZE_IN_BYTE);
531 (void)size;
532 #ifndef NDEBUG
533 there =
534 #endif
535 file_pool_seek(file, here, SEEK_SET);
536 #ifndef NDEBUG
537 yassert(there == (ssize_t)here);
538 (void)size;
539 (void)there;
540 (void)here;
541 #endif
542 }
543 }
544
545 file_offset_node->value = page_cache;
546
547 #if DEBUG
548 log_debug("test");
549 #endif
550 }
551
552 return page_cache;
553 }
554
555 static void
journal_cjf_page_cache_write(file_pool_file_t file,u64 file_offset,s16 offset,const void * value,u32 value_len)556 journal_cjf_page_cache_write(file_pool_file_t file, u64 file_offset, s16 offset, const void *value, u32 value_len)
557 {
558 log_cjf_page_debug("cjf: %s: writing slot %i from page at offset %llu", file_pool_filename(file), offset, file_offset);
559
560 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
561 // get or create a node for the fd
562
563 journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_get_for_rw(file, file_offset);
564
565 // update the last written entry to keep the highest value
566
567 if(offset > page_cache->last_written_entry)
568 {
569 s16 value_len_slots = ((value_len + 7) >> 3) - 1;
570
571 yassert(value_len_slots >= 0);
572
573 page_cache->last_written_entry = offset + value_len_slots;
574 }
575
576 // update the last written entry to keep the smallest value
577
578 if(offset < page_cache->first_written_entry)
579 {
580 page_cache->first_written_entry = offset;
581 }
582
583 // update the entry
584
585 memcpy(&page_cache->buffer[offset], value, value_len);
586
587 // move at the head of the MRU
588
589 journal_cjf_page_cache_remove_from_mru(page_cache);
590 journal_cjf_page_cache_add_to_mru(page_cache);
591
592 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
593 }
594
595 static void
journal_cjf_page_cache_read(file_pool_file_t file,u64 file_offset,s16 offset,void * value,u32 value_len)596 journal_cjf_page_cache_read(file_pool_file_t file, u64 file_offset, s16 offset, void *value, u32 value_len)
597 {
598 log_cjf_page_debug("cjf: %s: reading slot %i from page at offset %llu", file_pool_filename(file), offset, file_offset);
599
600 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
601 // get or create a node for the fd
602
603 journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_get_for_rw(file, file_offset);
604
605 yassert(offset + value_len <= CJF_PAGE_SIZE_IN_BYTE);
606
607 memcpy(value, &page_cache->buffer[offset], value_len);
608
609 // move at the head of the MRU
610
611 journal_cjf_page_cache_remove_from_mru(page_cache);
612 journal_cjf_page_cache_add_to_mru(page_cache);
613
614 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
615 }
616
617 /**
618 * @param file
619 * @param file_offset
620 * @param offset in slot size units (8 bytes)
621 * @param value
622 */
623
624 void
journal_cjf_page_cache_write_item(file_pool_file_t file,u64 file_offset,s16 offset,const journal_cjf_page_tbl_item * value)625 journal_cjf_page_cache_write_item(file_pool_file_t file, u64 file_offset, s16 offset, const journal_cjf_page_tbl_item *value)
626 {
627 yassert(file_offset >= CJF_HEADER_SIZE);
628 log_cjf_page_debug("cjf: %s: %lli=%llx [ %i ] write {%08x,%08x}", file_pool_filename(file), file_offset, file_offset, offset, value->ends_with_serial, value->stream_file_offset);
629
630 journal_cjf_page_cache_write(file, file_offset, offset + CJF_SECTION_INDEX_SLOT_HEAD_SLOT, value, sizeof(journal_cjf_page_tbl_item));
631 }
632
633 void
journal_cjf_page_cache_read_item(file_pool_file_t file,u64 file_offset,s16 offset,journal_cjf_page_tbl_item * value)634 journal_cjf_page_cache_read_item(file_pool_file_t file, u64 file_offset, s16 offset, journal_cjf_page_tbl_item *value)
635 {
636 yassert(file_offset >= CJF_HEADER_SIZE);
637 journal_cjf_page_cache_read(file, file_offset, offset + CJF_SECTION_INDEX_SLOT_HEAD_SLOT, value, sizeof(journal_cjf_page_tbl_item));
638 log_cjf_page_debug("cjf: %s: %lli=%llx [ %i ] read {%08x,%08x}", file_pool_filename(file), file_offset, file_offset, offset, value->ends_with_serial, value->stream_file_offset);
639 }
640
641 void
journal_cjf_page_cache_write_header(file_pool_file_t file,u64 file_offset,const journal_cjf_page_tbl_header * value)642 journal_cjf_page_cache_write_header(file_pool_file_t file, u64 file_offset, const journal_cjf_page_tbl_header *value)
643 {
644 yassert(file_offset >= CJF_HEADER_SIZE);
645 yassert(value->count <= value->size);
646 yassert(((value->count <= value->size) && (value->next_page_offset < file_offset)) || (value->next_page_offset > file_offset) || (value->next_page_offset == 0));
647 yassert(value->stream_end_offset != 0);
648
649 log_cjf_page_debug("cjf: %s: %lli=%llx update header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
650
651 journal_cjf_page_cache_write(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
652 }
653
654 void
journal_cjf_page_cache_write_new_header(file_pool_file_t file,u64 file_offset)655 journal_cjf_page_cache_write_new_header(file_pool_file_t file, u64 file_offset)
656 {
657 static const journal_cjf_page_tbl_header new_page_header = PAGE_INITIALIZER;
658 static const journal_cjf_page_tbl_item empty_item = {0,0};
659 const journal_cjf_page_tbl_header *value = &new_page_header;
660 yassert(file_offset >= CJF_HEADER_SIZE);
661 yassert(value->count <= value->size);
662 yassert(((value->count <= value->size) && (value->next_page_offset < file_offset)) || (value->next_page_offset > file_offset) || (value->next_page_offset == 0));
663
664 log_cjf_page_debug("cjf: %s: %lli=%llx write header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
665
666 journal_cjf_page_cache_write(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
667
668 for(int i = 0; i < CJF_SECTION_INDEX_SLOT_COUNT; ++i)
669 {
670 journal_cjf_page_cache_write_item(file, file_offset, i, &empty_item);
671 }
672 }
673
674 void
journal_cjf_page_cache_read_header(file_pool_file_t file,u64 file_offset,journal_cjf_page_tbl_header * value)675 journal_cjf_page_cache_read_header(file_pool_file_t file, u64 file_offset, journal_cjf_page_tbl_header *value)
676 {
677 yassert(file_offset >= CJF_HEADER_SIZE);
678 journal_cjf_page_cache_read(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
679
680 log_cjf_page_debug("cjf: %s: %lli=%llx read header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
681 }
682
683 static void
journal_cjf_page_cache_items_flush(u64_set * page_cache_set)684 journal_cjf_page_cache_items_flush(u64_set *page_cache_set)
685 {
686 yassert(group_mutex_islocked(&page_cache_mtx));
687
688 size_t here;
689 file_pool_file_t file = NULL;
690 u64_set_iterator iter;
691 u64_set_iterator_init(page_cache_set, &iter);
692 while(u64_set_iterator_hasnext(&iter))
693 {
694 u64_node *file_offset_node = u64_set_iterator_next_node(&iter);
695 journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
696 if(page_cache->first_written_entry <= page_cache->last_written_entry)
697 {
698 if(file == NULL)
699 {
700 file = page_cache->file;
701 yassert(file != NULL);
702 file_pool_tell(file, &here); // can only fail if &here is NULL
703 }
704
705 yassert(file == page_cache->file);
706
707 journal_cjf_page_cache_item_flush_internal(page_cache);
708
709 // do not move in the MRU : it will naturally fall down if not used anymore
710 // (yup, nothing to do)
711 }
712 }
713
714 if(file != NULL)
715 {
716 file_pool_seek(file, here, SEEK_SET);
717 }
718 }
719
720 static void
journal_cjf_page_cache_items_close(u64_set * page_cache_set)721 journal_cjf_page_cache_items_close(u64_set *page_cache_set)
722 {
723 yassert(group_mutex_islocked(&page_cache_mtx));
724
725 /*
726 list_sl_s delete_list;
727 list_sl_init(&delete_list);
728 */
729 size_t here;
730 file_pool_file_t file = NULL;
731 u64_set_iterator iter;
732 u64_set_iterator_init(page_cache_set, &iter);
733 while(u64_set_iterator_hasnext(&iter))
734 {
735 u64_node *file_offset_node = u64_set_iterator_next_node(&iter);
736 journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
737
738 if(page_cache->first_written_entry <= page_cache->last_written_entry) // page needs to be flushed ?
739 {
740 if(file == NULL)
741 {
742 file = page_cache->file;
743 yassert(file != NULL);
744 file_pool_tell(file, &here); // remember the position
745 }
746
747 yassert(file == page_cache->file);
748
749 journal_cjf_page_cache_item_flush_internal(page_cache);
750 /*
751 // delete the item
752 list_sl_push(&delete_list, page_cache);
753 */
754 file_offset_node->value = NULL;
755 }
756
757 journal_cjf_page_cache_remove_from_mru(page_cache);
758 journal_cjf_page_cache_free(page_cache);
759 }
760
761 if(file != NULL)
762 {
763 file_pool_seek(file, here, SEEK_SET); // go back to the position
764
765 // delete pages
766 /*
767 journal_cjf_page_cache_item *page_cache;
768 while((page_cache = (journal_cjf_page_cache_item*)list_sl_pop(&delete_list)) != NULL)
769 {
770 journal_cjf_page_cache_remove_from_mru(page_cache);
771
772 journal_cjf_page_cache_free(page_cache);
773 }
774 */
775 }
776
777 u64_set_destroy(page_cache_set);
778 }
779
780 void
journal_cjf_page_cache_flush(file_pool_file_t file)781 journal_cjf_page_cache_flush(file_pool_file_t file)
782 {
783 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
784 u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
785 if(page_cache_set != NULL)
786 {
787 journal_cjf_page_cache_items_flush(page_cache_set);
788 }
789 else
790 {
791 log_warn("cjf: %s: is not cached", file_pool_filename(file));
792 }
793
794 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
795 }
796
797 void
journal_cjf_page_cache_flush_page(file_pool_file_t file,u64 file_offset)798 journal_cjf_page_cache_flush_page(file_pool_file_t file, u64 file_offset)
799 {
800 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
801
802 journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_file(file, file_offset);
803
804 if(page_cache != NULL)
805 {
806 journal_cjf_page_cache_item_flush(page_cache);
807 }
808
809 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
810 }
811
812 void
journal_cjf_page_cache_clear(file_pool_file_t file,u64 file_offset)813 journal_cjf_page_cache_clear(file_pool_file_t file, u64 file_offset)
814 {
815 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
816
817 journal_cjf_page_cache_delete_from_file_and_offset(file, file_offset);
818
819 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
820 }
821
822 void
journal_cjf_page_cache_close(file_pool_file_t file)823 journal_cjf_page_cache_close(file_pool_file_t file)
824 {
825 group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
826
827 u64_set *page_cache_set = journal_cjf_page_cache_set_from_file(file);
828
829 if(page_cache_set != NULL)
830 {
831 // destroy the u64_set content
832 journal_cjf_page_cache_items_close(page_cache_set);
833
834 // delete the file_node
835 ptr_set_delete(&page_cache_item_by_file, file);
836 }
837 group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
838 }
839
840 static void
journal_cjf_page_cache_finalize_cb(ptr_node * file_node)841 journal_cjf_page_cache_finalize_cb(ptr_node *file_node)
842 {
843 u64_set *page_cache_set = (u64_set*)&file_node->value;
844 journal_cjf_page_cache_items_close(page_cache_set);
845 }
846
847 void
journal_cjf_page_cache_finalize()848 journal_cjf_page_cache_finalize()
849 {
850 group_mutex_write_lock(&page_cache_mtx);
851 ptr_set_callback_and_destroy(&page_cache_item_by_file, journal_cjf_page_cache_finalize_cb);
852 journal_cjf_page_mru_clear();
853 group_mutex_write_unlock(&page_cache_mtx);
854 }
855
856 #endif
857
858 /** @} */
859