1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup
36  *  @ingroup
37  *  @brief
38  *
39  *
40  *
41  * @{
42  *
43  *----------------------------------------------------------------------------*/
44 
45 /*------------------------------------------------------------------------------
46  * GLOBAL VARIABLES */
47 
48 /*------------------------------------------------------------------------------
49  * STATIC PROTOTYPES */
50 
51 /*------------------------------------------------------------------------------
52  * FUNCTIONS */
53 
54 /** @brief Function ...
55  *
56  *  ...
57  *
58  *  @param ...
59  *
60  *  @retval OK
61  *  @retval NOK
62  */
63 
64 #define ZDB_JOURNAL_CODE 1
65 
66 #define JOURNAL_CJF_BASE 1
67 
68 #include "dnsdb/dnsdb-config.h"
69 #include "dnsdb/journal-cjf-page-cache.h"
70 #include "dnsdb/journal-cjf-common.h"
71 #include "dnsdb/journal-cjf.h"
72 
73 #include <sys/types.h>
74 #include <sys/stat.h>
75 #include <unistd.h>
76 #include <dirent.h>
77 #include <fcntl.h>
78 
79 #include <dnscore/limited_input_stream.h>
80 #include <dnscore/mutex.h>
81 #include <dnscore/serial.h>
82 #include <dnscore/dns_resource_record.h>
83 
84 #include <dnscore/ptr_set.h>
85 #include <dnscore/fdtools.h>
86 
87 #include <dnscore/ptr_set.h>
88 #include <dnscore/u64_set.h>
89 #include <dnscore/list-dl.h>
90 #include <dnscore/list-sl.h>
91 
92 #include <dnscore/ctrl-rfc.h>
93 
94 #include "dnsdb/zdb_error.h"
95 #include "dnsdb/zdb_utils.h"
96 #include "dnsdb/journal.h"
97 #include "dnsdb/zdb_types.h"
98 #include "dnsdb/xfr_copy.h"
99 #include "dnsdb/zdb-zone-path-provider.h"
100 
101 #if JOURNAL_CJF_ENABLED
102 
103 #define DEBUG_JOURNAL 0
104 #if !DEBUG
105 #undef DEBUG_JOURNAL
106 #define DEBUG_JOURNAL 0
107 #endif
108 
109 #define LOCK_NONE   0
110 #define LOCK_READ   1
111 #define LOCK_WRITE  2
112 
113 #define CJF_EXT "cjf"
114 #define CJF_EXT_STRLEN 3
115 
116 #define SOA_RDATA_SIZE_MAX 532
117 
118 #define DO_SYNC 1
119 
120 extern logger_handle* g_database_logger;
121 #define MODULE_MSG_HANDLE g_database_logger
122 
123 /*
124  * Contains the journal (almost: not the matching start and end SOA)
125  */
126 
127 
128 #define CJF_SECTION_INDEX_SLOT_HEAD  16
129 #define CJF_SECTION_INDEX_SLOT_SIZE  8
130 #define CJF_SECTION_INDEX_SLOT_COUNT 510
131 #define CJF_SECTION_INDEX_SIZE       (CJF_SECTION_INDEX_SLOT_HEAD + CJF_SECTION_INDEX_SLOT_SIZE * CJF_SECTION_INDEX_SLOT_COUNT) // 4KB
132 
133 #define CJF_PAGE_SIZE_IN_BYTE        (CJF_SECTION_INDEX_SLOT_HEAD + (CJF_SECTION_INDEX_SLOT_COUNT * CJF_SECTION_INDEX_SLOT_SIZE))
134 #define CJF_PAGE_ARBITRARY_UPDATE_SIZE      512
135 
136 #define CJF_SECTION_INDEX_SLOT_HEAD_SLOT (CJF_SECTION_INDEX_SLOT_HEAD / CJF_SECTION_INDEX_SLOT_SIZE)
137 
138 //#define log_cjf_page_debug log_debug5
139 #define log_cjf_page_debug log_debug
140 
141 /*
142  * PAGE
143  *
144  * Serial Number Stream Offset
145  *
146  * The table of serials streams (IXFR) and their offset
147  * The value stored is of the serial ending the IXFR
148  */
149 
150 #define PAGE_INITIALIZER {CJF_PAGE_MAGIC, 0, 0, CJF_SECTION_INDEX_SLOT_COUNT, 0, 0}
151 
152 static bool empty_page_tbl_header_and_zeroes_initialised = FALSE;
153 static u8 empty_page_tbl_header_and_zeroes[CJF_SECTION_INDEX_SIZE];
154 static u32 journal_cfj_page_mru_size = 64;
155 
156 //
157 
158 struct journal_cjf_page_tbl
159 {
160     journal_cjf_page_tbl_header hdr;
161     journal_cjf_page_tbl_item items[CJF_SECTION_INDEX_SLOT_COUNT];
162 };
163 
164 // PAGE are all over the place, going back to write into or read from one is a drag
165 // the first idea would be to have the current PAGE along with the journal, and it would solve MOST problems
166 // but PAGE may be needed at more than one place.
167 // So the idea is to have them cached:
168 
169 // An PAGE cache entry is:
170 // _ maybe a reference count
171 // _ a file descriptor
172 // _ a file position
173 // _ a dirty flag, or a last_written offset
174 // _ maybe a count
175 // _ a 4K buffer
176 // _ maybe an MRU entry ?
177 // _ maybe a mutex
178 // the key ...
179 //
180 // one should be able to
181 // _ load an PAGE from the disk
182 // _ flush an PAGE back to the disk
183 // _ flush all PAGE linked to a file descriptor to the disk (KEY!)
184 // _ update the PAGE of a given position in a file to the disk (KEY!)
185 //
186 // there should not be a lot of PAGE per file descriptor, and PAGE should be flushed back at key times
187 // ie: closing the file descriptor, too many PAGE flush the least used ones, ...
188 //
189 // they should be pooled, I think
190 
191 #define JCJFPCI_TAG 0x494350464a434a
192 
193 struct journal_cjf_page_cache_item
194 {
195     u64 file_offset;
196     journal_cjf_page_tbl_item *buffer;
197     file_pool_file_t file;
198     s16 first_written_entry;
199     s16 last_written_entry; // set to the end means not dirty
200 };
201 
202 typedef struct journal_cjf_page_cache_item journal_cjf_page_cache_item;
203 
204 // fd => list_dl(page_cache_item)
205 
206 static ptr_set page_cache_item_by_file = PTR_SET_PTR_EMPTY;
207 static list_dl_s page_cache_mru = {{NULL, NULL}, {NULL, NULL}, 0};
208 static group_mutex_t page_cache_mtx = GROUP_MUTEX_INITIALIZER;
209 
210 static void journal_cjf_page_cache_free(journal_cjf_page_cache_item *page_cache);
211 
212 void
journal_cjf_page_cache_init()213 journal_cjf_page_cache_init()
214 {
215     if(empty_page_tbl_header_and_zeroes_initialised)
216     {
217         return;
218     }
219 
220     journal_cjf_page_tbl_header head = PAGE_INITIALIZER;
221     ZEROMEMORY(empty_page_tbl_header_and_zeroes, sizeof(empty_page_tbl_header_and_zeroes));
222     memcpy(empty_page_tbl_header_and_zeroes, &head, CJF_SECTION_INDEX_SLOT_HEAD);
223 
224     list_dl_init(&page_cache_mru);
225 
226     empty_page_tbl_header_and_zeroes_initialised = TRUE;
227 }
228 
229 static void
journal_cjf_page_cache_remove_from_mru(journal_cjf_page_cache_item * page_cache)230 journal_cjf_page_cache_remove_from_mru(journal_cjf_page_cache_item *page_cache)
231 {
232     if(list_dl_size(&page_cache_mru) > 0)
233     {
234         list_dl_remove(&page_cache_mru, page_cache);
235     }
236 }
237 
238 static void
journal_cjf_page_cache_add_to_mru(journal_cjf_page_cache_item * page_cache)239 journal_cjf_page_cache_add_to_mru(journal_cjf_page_cache_item *page_cache)
240 {
241     list_dl_insert(&page_cache_mru, page_cache);
242 }
243 
244 /*
245 void
246 journal_cjf_page_cache_finalize()
247 {
248 }
249 */
250 
251 /**
252  * called undirty because clear and clean are too similar
253  *
254  * @param page_cache
255  */
256 
257 static void
journal_cjf_page_cache_item_undirty(journal_cjf_page_cache_item * page_cache)258 journal_cjf_page_cache_item_undirty(journal_cjf_page_cache_item *page_cache)
259 {
260     page_cache->first_written_entry = (1 + CJF_SECTION_INDEX_SLOT_COUNT);
261     page_cache->last_written_entry = -1;
262 }
263 
264 static void
journal_cjf_page_cache_item_flush_internal(journal_cjf_page_cache_item * page_cache)265 journal_cjf_page_cache_item_flush_internal(journal_cjf_page_cache_item *page_cache)
266 {
267     file_pool_file_t file = page_cache->file;
268 
269     // at file_offset, write from first to last entries
270 
271     off_t first_offset = page_cache->file_offset + (page_cache->first_written_entry * CJF_SECTION_INDEX_SLOT_SIZE);
272     size_t size = ((page_cache->last_written_entry - page_cache->first_written_entry) + 1) * CJF_SECTION_INDEX_SLOT_SIZE;
273 
274     log_cjf_page_debug("cjf: %s: flush page @%lli=%llx size=%i", file_pool_filename(file), first_offset, first_offset, size);
275 
276     for(;;)
277     {
278         file_pool_seek(file, first_offset, SEEK_SET);
279 
280         ya_result ret = file_pool_writefully(file, &page_cache->buffer[page_cache->first_written_entry], size);
281 
282         if(ret == (s32)size)
283         {
284             break;
285         }
286 
287         // should no be reached, but if an issue occur, better not hammer the logs
288 
289         log_err("cjf: %s: flush page @%lli=%llx size=%i failed with: %r", file_pool_filename(file), first_offset, first_offset, size, ret);
290 
291         sleep(1);
292     }
293 
294     // mark the entry as not being used
295 
296     journal_cjf_page_cache_item_undirty(page_cache);
297 }
298 
299 /**
300  * @param page_cache
301  * @return
302  */
303 
304 static void
journal_cjf_page_cache_item_flush(journal_cjf_page_cache_item * page_cache)305 journal_cjf_page_cache_item_flush(journal_cjf_page_cache_item *page_cache)
306 {
307     yassert(group_mutex_islocked(&page_cache_mtx));
308 
309     if(page_cache != NULL)
310     {
311         if(page_cache->first_written_entry <= page_cache->last_written_entry)
312         {
313             file_pool_file_t file = page_cache->file;
314             size_t here;
315 
316             file_pool_tell(file, &here);
317 
318             journal_cjf_page_cache_item_flush_internal(page_cache);
319 
320             file_pool_seek(file, (ssize_t)here, SEEK_SET);
321         }
322     }
323     else
324     {
325         log_err("cjf: journal_cjf_page_cache_flush_item(NULL)");
326     }
327 }
328 
329 static void
journal_cjf_page_cache_cull()330 journal_cjf_page_cache_cull()
331 {
332     yassert(group_mutex_islocked(&page_cache_mtx));
333 
334     // culls a cache entry at the end of the MRU
335 
336     log_cjf_page_debug("cjf: cull pages");
337 
338     while(list_dl_size(&page_cache_mru) > journal_cfj_page_mru_size)
339     {
340         // get the tail one
341         journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)list_dl_remove_last(&page_cache_mru);
342         // flush it
343         journal_cjf_page_cache_item_flush(page_cache);
344         // free it
345     }
346 
347     log_cjf_page_debug("cjf: cull pages done");
348 }
349 
350 static void
journal_cjf_page_mru_clear()351 journal_cjf_page_mru_clear()
352 {
353     yassert(group_mutex_islocked(&page_cache_mtx));
354 
355     // culls a cache entry at the end of the MRU
356 
357     log_cjf_page_debug("cjf: clear pages");
358 
359     while(list_dl_size(&page_cache_mru) > 0)
360     {
361         // get the tail one
362         journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)list_dl_remove_last(&page_cache_mru);
363         // flush it
364         journal_cjf_page_cache_item_flush(page_cache);
365         // free it
366         journal_cjf_page_cache_free(page_cache);
367     }
368 
369     log_cjf_page_debug("cjf: clear pages done");
370 }
371 
372 static journal_cjf_page_cache_item *
journal_cjf_page_cache_new(file_pool_file_t file,u32 file_offset)373 journal_cjf_page_cache_new(file_pool_file_t file, u32 file_offset)
374 {
375     journal_cjf_page_cache_item *page_cache;
376 
377     log_cjf_page_debug("cjf: %s: new page cache for offset %i", file_pool_filename(file), file_offset);
378 
379     ZALLOC_OBJECT_OR_DIE(page_cache, journal_cjf_page_cache_item, JCJFPCI_TAG);
380     page_cache->file_offset = file_offset;
381     MALLOC_OR_DIE(journal_cjf_page_tbl_item*, page_cache->buffer, CJF_PAGE_SIZE_IN_BYTE , JCJFTI_TAG);
382     page_cache->file = file;
383     journal_cjf_page_cache_item_undirty(page_cache);
384 
385 #if DEBUG
386     memset(page_cache->buffer, 0xfe, CJF_PAGE_SIZE_IN_BYTE);
387 #endif
388 
389     return page_cache;
390 }
391 
392 static void
journal_cjf_page_cache_free(journal_cjf_page_cache_item * page_cache)393 journal_cjf_page_cache_free(journal_cjf_page_cache_item *page_cache)
394 {
395 #if DEBUG
396     memset(page_cache->buffer, 0xfd, CJF_PAGE_SIZE_IN_BYTE);
397 #endif
398 
399     free(page_cache->buffer);
400     ZFREE_OBJECT(page_cache);
401 }
402 
403 static inline u64_set*
journal_cjf_page_cache_set_from_file(file_pool_file_t file)404 journal_cjf_page_cache_set_from_file(file_pool_file_t file)
405 {
406     yassert(group_mutex_islocked(&page_cache_mtx));
407 
408     ptr_node *file_node = ptr_set_find(&page_cache_item_by_file, file);
409     if(file_node != NULL)
410     {
411         return (u64_set*)&file_node->value;
412     }
413     else
414     {
415         log_warn("cjf: %s: file is not cached", file_pool_filename(file));
416         return NULL;
417     }
418 }
419 
420 static inline journal_cjf_page_cache_item*
journal_cjf_page_cache_from_set(u64_set * page_cache_set,u32 file_offset)421 journal_cjf_page_cache_from_set(u64_set* page_cache_set, u32 file_offset)
422 {
423     u64_node *file_offset_node = u64_set_find(page_cache_set, file_offset);
424 
425     if(file_offset_node != NULL)
426     {
427         return (journal_cjf_page_cache_item*)file_offset_node->value;
428     }
429     else
430     {
431         log_err("cjf: page is not cached");
432     }
433 
434     return NULL;
435 }
436 
437 static inline journal_cjf_page_cache_item*
journal_cjf_page_cache_from_file(file_pool_file_t file,u32 file_offset)438 journal_cjf_page_cache_from_file(file_pool_file_t file, u32 file_offset)
439 {
440     u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
441 
442     if(page_cache_set != NULL)
443     {
444         journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_set(page_cache_set, file_offset);
445 
446         return page_cache;
447     }
448 
449     return NULL;
450 }
451 
452 static void
journal_cjf_page_cache_delete_from_file_and_offset(const file_pool_file_t file,u32 file_offset)453 journal_cjf_page_cache_delete_from_file_and_offset(const file_pool_file_t file, u32 file_offset)
454 {
455     yassert(group_mutex_islocked(&page_cache_mtx));
456 
457     log_cjf_page_debug("cjf: %s: dropping page at offset %i", file_pool_filename(file), file_offset);
458 
459     u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
460 
461     if(page_cache_set != NULL)
462     {
463         // get the PAGE cache at the file_offset
464 
465         journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_set(page_cache_set, file_offset);
466 
467         if(page_cache != NULL)
468         {
469             u64_set_delete(page_cache_set, file_offset);
470 
471             journal_cjf_page_cache_item_flush(page_cache);
472 
473             journal_cjf_page_cache_remove_from_mru(page_cache);
474 
475             journal_cjf_page_cache_free(page_cache);
476         }
477     }
478 }
479 
480 static journal_cjf_page_cache_item*
journal_cjf_page_cache_get_for_rw(file_pool_file_t file,u64 file_offset)481 journal_cjf_page_cache_get_for_rw(file_pool_file_t file, u64 file_offset)
482 {
483     // get or create a node for the fd
484 
485     ptr_node *file_node = ptr_set_insert(&page_cache_item_by_file, file);
486 
487     // make some room, if needed
488 
489     journal_cjf_page_cache_cull();
490 
491     // get or create an PAGE cache at the file_offset
492 
493     u64_node *file_offset_node = u64_set_insert((u64_set*)&file_node->value, file_offset);
494 
495     journal_cjf_page_cache_item *page_cache;
496 
497     if(file_offset_node->value != NULL)
498     {
499         // already got that one
500 
501         page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
502     }
503     else
504     {
505         // have to create it
506         page_cache = journal_cjf_page_cache_new(file, file_offset);
507 
508         // if the file is big enough: load it
509 
510         size_t the_file_size;
511 
512         if(ISOK(file_pool_get_size(file, &the_file_size)))
513         {
514             if(file_offset + CJF_PAGE_SIZE_IN_BYTE <= the_file_size)
515             {
516                 size_t here = ~0ULL;
517                 file_pool_tell(file, &here);
518                 yassert(here != ~0ULL);
519 
520 #ifndef NDEBUG
521                 ssize_t there =
522 #endif
523                 file_pool_seek(file, file_offset, SEEK_SET);
524                 yassert(there == (ssize_t)file_offset);
525                 // it is a READ, because to write the cache, it must first be loaded
526                 ssize_t size = file_pool_readfully(file, page_cache->buffer, CJF_PAGE_SIZE_IN_BYTE);
527 #if DEBUG
528                 log_memdump_ex(g_database_logger, MSG_DEBUG6, page_cache->buffer, size, 32, OSPRINT_DUMP_ADDRESS|OSPRINT_DUMP_HEX);
529 #endif
530                 yassert(size == CJF_PAGE_SIZE_IN_BYTE);
531                 (void)size;
532 #ifndef NDEBUG
533                 there =
534 #endif
535 file_pool_seek(file, here, SEEK_SET);
536 #ifndef NDEBUG
537                 yassert(there == (ssize_t)here);
538                 (void)size;
539                 (void)there;
540                 (void)here;
541 #endif
542             }
543         }
544 
545         file_offset_node->value = page_cache;
546 
547 #if DEBUG
548         log_debug("test");
549 #endif
550     }
551 
552     return page_cache;
553 }
554 
555 static void
journal_cjf_page_cache_write(file_pool_file_t file,u64 file_offset,s16 offset,const void * value,u32 value_len)556 journal_cjf_page_cache_write(file_pool_file_t file, u64 file_offset, s16 offset, const void *value, u32 value_len)
557 {
558     log_cjf_page_debug("cjf: %s: writing slot %i from page at offset %llu", file_pool_filename(file), offset, file_offset);
559 
560     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
561     // get or create a node for the fd
562 
563     journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_get_for_rw(file, file_offset);
564 
565     // update the last written entry to keep the highest value
566 
567     if(offset > page_cache->last_written_entry)
568     {
569         s16 value_len_slots = ((value_len + 7) >> 3) - 1;
570 
571         yassert(value_len_slots >= 0);
572 
573         page_cache->last_written_entry = offset + value_len_slots;
574     }
575 
576     // update the last written entry to keep the smallest value
577 
578     if(offset < page_cache->first_written_entry)
579     {
580         page_cache->first_written_entry = offset;
581     }
582 
583     // update the entry
584 
585     memcpy(&page_cache->buffer[offset], value, value_len);
586 
587     // move at the head of the MRU
588 
589     journal_cjf_page_cache_remove_from_mru(page_cache);
590     journal_cjf_page_cache_add_to_mru(page_cache);
591 
592     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
593 }
594 
595 static void
journal_cjf_page_cache_read(file_pool_file_t file,u64 file_offset,s16 offset,void * value,u32 value_len)596 journal_cjf_page_cache_read(file_pool_file_t file, u64 file_offset, s16 offset, void *value, u32 value_len)
597 {
598     log_cjf_page_debug("cjf: %s: reading slot %i from page at offset %llu", file_pool_filename(file), offset, file_offset);
599 
600     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
601     // get or create a node for the fd
602 
603     journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_get_for_rw(file, file_offset);
604 
605     yassert(offset + value_len <= CJF_PAGE_SIZE_IN_BYTE);
606 
607     memcpy(value, &page_cache->buffer[offset], value_len);
608 
609     // move at the head of the MRU
610 
611     journal_cjf_page_cache_remove_from_mru(page_cache);
612     journal_cjf_page_cache_add_to_mru(page_cache);
613 
614     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
615 }
616 
617 /**
618  * @param file
619  * @param file_offset
620  * @param offset in slot size units (8 bytes)
621  * @param value
622  */
623 
624 void
journal_cjf_page_cache_write_item(file_pool_file_t file,u64 file_offset,s16 offset,const journal_cjf_page_tbl_item * value)625 journal_cjf_page_cache_write_item(file_pool_file_t file, u64 file_offset, s16 offset, const journal_cjf_page_tbl_item *value)
626 {
627     yassert(file_offset >= CJF_HEADER_SIZE);
628     log_cjf_page_debug("cjf: %s: %lli=%llx [ %i ] write {%08x,%08x}", file_pool_filename(file), file_offset, file_offset, offset, value->ends_with_serial, value->stream_file_offset);
629 
630     journal_cjf_page_cache_write(file, file_offset, offset + CJF_SECTION_INDEX_SLOT_HEAD_SLOT, value, sizeof(journal_cjf_page_tbl_item));
631 }
632 
633 void
journal_cjf_page_cache_read_item(file_pool_file_t file,u64 file_offset,s16 offset,journal_cjf_page_tbl_item * value)634 journal_cjf_page_cache_read_item(file_pool_file_t file, u64 file_offset, s16 offset, journal_cjf_page_tbl_item *value)
635 {
636     yassert(file_offset >= CJF_HEADER_SIZE);
637     journal_cjf_page_cache_read(file, file_offset, offset + CJF_SECTION_INDEX_SLOT_HEAD_SLOT, value, sizeof(journal_cjf_page_tbl_item));
638     log_cjf_page_debug("cjf: %s: %lli=%llx [ %i ] read {%08x,%08x}", file_pool_filename(file), file_offset, file_offset, offset, value->ends_with_serial, value->stream_file_offset);
639 }
640 
641 void
journal_cjf_page_cache_write_header(file_pool_file_t file,u64 file_offset,const journal_cjf_page_tbl_header * value)642 journal_cjf_page_cache_write_header(file_pool_file_t file, u64 file_offset,  const journal_cjf_page_tbl_header *value)
643 {
644     yassert(file_offset >= CJF_HEADER_SIZE);
645     yassert(value->count <= value->size);
646     yassert(((value->count <= value->size) && (value->next_page_offset < file_offset)) || (value->next_page_offset > file_offset) || (value->next_page_offset == 0));
647     yassert(value->stream_end_offset != 0);
648 
649     log_cjf_page_debug("cjf: %s: %lli=%llx update header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
650 
651     journal_cjf_page_cache_write(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
652 }
653 
654 void
journal_cjf_page_cache_write_new_header(file_pool_file_t file,u64 file_offset)655 journal_cjf_page_cache_write_new_header(file_pool_file_t file, u64 file_offset)
656 {
657     static const journal_cjf_page_tbl_header new_page_header = PAGE_INITIALIZER;
658     static const journal_cjf_page_tbl_item empty_item = {0,0};
659     const journal_cjf_page_tbl_header *value = &new_page_header;
660     yassert(file_offset >= CJF_HEADER_SIZE);
661     yassert(value->count <= value->size);
662     yassert(((value->count <= value->size) && (value->next_page_offset < file_offset)) || (value->next_page_offset > file_offset) || (value->next_page_offset == 0));
663 
664     log_cjf_page_debug("cjf: %s: %lli=%llx write header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
665 
666     journal_cjf_page_cache_write(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
667 
668     for(int i = 0; i < CJF_SECTION_INDEX_SLOT_COUNT; ++i)
669     {
670         journal_cjf_page_cache_write_item(file, file_offset, i, &empty_item);
671     }
672 }
673 
674 void
journal_cjf_page_cache_read_header(file_pool_file_t file,u64 file_offset,journal_cjf_page_tbl_header * value)675 journal_cjf_page_cache_read_header(file_pool_file_t file, u64 file_offset,  journal_cjf_page_tbl_header *value)
676 {
677     yassert(file_offset >= CJF_HEADER_SIZE);
678     journal_cjf_page_cache_read(file, file_offset, 0, value, CJF_SECTION_INDEX_SLOT_HEAD);
679 
680     log_cjf_page_debug("cjf: %s: %lli=%llx read header {%08x,%3d,%3d,%08x}", file_pool_filename(file), file_offset, file_offset, value->next_page_offset, value->count, value->size, value->stream_end_offset);
681 }
682 
683 static void
journal_cjf_page_cache_items_flush(u64_set * page_cache_set)684 journal_cjf_page_cache_items_flush(u64_set *page_cache_set)
685 {
686     yassert(group_mutex_islocked(&page_cache_mtx));
687 
688     size_t here;
689     file_pool_file_t file = NULL;
690     u64_set_iterator iter;
691     u64_set_iterator_init(page_cache_set, &iter);
692     while(u64_set_iterator_hasnext(&iter))
693     {
694         u64_node *file_offset_node = u64_set_iterator_next_node(&iter);
695         journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
696         if(page_cache->first_written_entry <= page_cache->last_written_entry)
697         {
698             if(file == NULL)
699             {
700                 file = page_cache->file;
701                 yassert(file != NULL);
702                 file_pool_tell(file, &here); // can only fail if &here is NULL
703             }
704 
705             yassert(file == page_cache->file);
706 
707             journal_cjf_page_cache_item_flush_internal(page_cache);
708 
709             // do not move in the MRU : it will naturally fall down if not used anymore
710             // (yup, nothing to do)
711         }
712     }
713 
714     if(file != NULL)
715     {
716         file_pool_seek(file, here, SEEK_SET);
717     }
718 }
719 
720 static void
journal_cjf_page_cache_items_close(u64_set * page_cache_set)721 journal_cjf_page_cache_items_close(u64_set *page_cache_set)
722 {
723     yassert(group_mutex_islocked(&page_cache_mtx));
724 
725     /*
726     list_sl_s delete_list;
727     list_sl_init(&delete_list);
728     */
729     size_t here;
730     file_pool_file_t file = NULL;
731     u64_set_iterator iter;
732     u64_set_iterator_init(page_cache_set, &iter);
733     while(u64_set_iterator_hasnext(&iter))
734     {
735         u64_node *file_offset_node = u64_set_iterator_next_node(&iter);
736         journal_cjf_page_cache_item *page_cache = (journal_cjf_page_cache_item*)file_offset_node->value;
737 
738         if(page_cache->first_written_entry <= page_cache->last_written_entry) // page needs to be flushed ?
739         {
740             if(file == NULL)
741             {
742                 file = page_cache->file;
743                 yassert(file != NULL);
744                 file_pool_tell(file, &here); // remember the position
745             }
746 
747             yassert(file == page_cache->file);
748 
749             journal_cjf_page_cache_item_flush_internal(page_cache);
750             /*
751             // delete the item
752             list_sl_push(&delete_list, page_cache);
753             */
754             file_offset_node->value = NULL;
755         }
756 
757         journal_cjf_page_cache_remove_from_mru(page_cache);
758         journal_cjf_page_cache_free(page_cache);
759     }
760 
761     if(file != NULL)
762     {
763         file_pool_seek(file, here, SEEK_SET); // go back to the position
764 
765         // delete pages
766         /*
767         journal_cjf_page_cache_item *page_cache;
768         while((page_cache = (journal_cjf_page_cache_item*)list_sl_pop(&delete_list)) != NULL)
769         {
770             journal_cjf_page_cache_remove_from_mru(page_cache);
771 
772             journal_cjf_page_cache_free(page_cache);
773         }
774         */
775     }
776 
777     u64_set_destroy(page_cache_set);
778 }
779 
780 void
journal_cjf_page_cache_flush(file_pool_file_t file)781 journal_cjf_page_cache_flush(file_pool_file_t file)
782 {
783     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
784     u64_set* page_cache_set = journal_cjf_page_cache_set_from_file(file);
785     if(page_cache_set != NULL)
786     {
787         journal_cjf_page_cache_items_flush(page_cache_set);
788     }
789     else
790     {
791         log_warn("cjf: %s: is not cached", file_pool_filename(file));
792     }
793 
794     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
795 }
796 
797 void
journal_cjf_page_cache_flush_page(file_pool_file_t file,u64 file_offset)798 journal_cjf_page_cache_flush_page(file_pool_file_t file, u64 file_offset)
799 {
800     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
801 
802     journal_cjf_page_cache_item *page_cache = journal_cjf_page_cache_from_file(file, file_offset);
803 
804     if(page_cache != NULL)
805     {
806         journal_cjf_page_cache_item_flush(page_cache);
807     }
808 
809     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
810 }
811 
812 void
journal_cjf_page_cache_clear(file_pool_file_t file,u64 file_offset)813 journal_cjf_page_cache_clear(file_pool_file_t file, u64 file_offset)
814 {
815     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
816 
817     journal_cjf_page_cache_delete_from_file_and_offset(file, file_offset);
818 
819     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
820 }
821 
822 void
journal_cjf_page_cache_close(file_pool_file_t file)823 journal_cjf_page_cache_close(file_pool_file_t file)
824 {
825     group_mutex_lock(&page_cache_mtx, GROUP_MUTEX_WRITE);
826 
827     u64_set *page_cache_set = journal_cjf_page_cache_set_from_file(file);
828 
829     if(page_cache_set != NULL)
830     {
831         // destroy the u64_set content
832         journal_cjf_page_cache_items_close(page_cache_set);
833 
834         // delete the file_node
835         ptr_set_delete(&page_cache_item_by_file, file);
836     }
837     group_mutex_unlock(&page_cache_mtx, GROUP_MUTEX_WRITE);
838 }
839 
840 static void
journal_cjf_page_cache_finalize_cb(ptr_node * file_node)841 journal_cjf_page_cache_finalize_cb(ptr_node *file_node)
842 {
843     u64_set *page_cache_set = (u64_set*)&file_node->value;
844     journal_cjf_page_cache_items_close(page_cache_set);
845 }
846 
847 void
journal_cjf_page_cache_finalize()848 journal_cjf_page_cache_finalize()
849 {
850     group_mutex_write_lock(&page_cache_mtx);
851     ptr_set_callback_and_destroy(&page_cache_item_by_file, journal_cjf_page_cache_finalize_cb);
852     journal_cjf_page_mru_clear();
853     group_mutex_write_unlock(&page_cache_mtx);
854 }
855 
856 #endif
857 
858 /** @} */
859