1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup streaming Streams
36 * @ingroup dnscore
37 * @brief
38 *
39 *
40 *
41 * @{
42 *
43 *----------------------------------------------------------------------------*/
44
45 #define __BUFFERED_FILE_C__ 1
46
47 #include "dnscore/dnscore-config.h"
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <sys/types.h>
51 #include <sys/stat.h>
52 #include <sys/mman.h>
53 #include <fcntl.h>
54 #include <unistd.h>
55 #include "dnscore/ptr_set.h"
56 #include "dnscore/mutex.h"
57 #include "dnscore/list-dl.h"
58 #include "dnscore/fdtools.h"
59 #include "dnscore/logger.h"
60 #include "dnscore/file.h"
61 #include "dnscore/u64_set.h"
62 #include "dnscore/timems.h"
63
64 #define BUFFERED_STATISTICS_ON_STDOUT 0
65 #define IGNORE_COST 0
66
67 #if BUFFERED_STATISTICS_ON_STDOUT
68 #include "dnscore/format.h"
69 #endif
70
71 #define MODULE_MSG_HANDLE g_system_logger
72
73 #define BUFFERED_FILE_TAG 0x454c4946524642 // BFRFILE_TAG
74 #define BUFFERED_FILE_PAGE_TAG 0x50454c4946524642 // BFRFILEP_TAG
75 #define BUFFERED_FILE_CACHE_TAG 0x43454c4946524642 // BFRFILEC_TAG
76
77 #define PAGE_USE_OLD_ENOUGH_VALUE_US 500000LL // 0.5s
78
79 struct buffered_file_t_;
80
81 typedef struct buffered_file_t_* buffered_file_t;
82
83 struct buffered_file_page
84 {
85 struct list_dl_node_s *next;
86 struct list_dl_node_s *prev;
87
88 buffered_file_t file;
89 u8 *buffer;
90 s64 position; // position of the page in the file (multiple of page granularity)
91 s64 timestamp; // last use (so it does fnot exit the MRU too fast except to be given to the same id)
92 s64 cost; // the time in us that was required to build that page (IOs)
93 s32 size;
94 s32 written_from; // if written_from <= written_to then the page needs to be written
95 s32 written_to_plus_one;
96 s32 read_to; // number of bytes in that page (usually = size unless it's the last one of the file)
97 int index; // for sorting
98 bool in_mru;
99 };
100
101 typedef struct buffered_file_page buffered_file_page;
102
103 typedef struct buffered_file_page* buffered_file_page_t;
104
105 struct buffered_file_page_set
106 {
107 group_mutex_t mtx;
108 u64_set offset_to_page;
109 };
110
111 typedef struct buffered_file_page_set buffered_file_page_set;
112
113 struct buffered_file_cache_t_
114 {
115 group_mutex_t mtx;
116 ptr_set id_to_page_set;
117 void *page_pool;
118 size_t page_pool_size;
119 s64 granularity_mask;
120 list_dl_s mru;
121 list_dl_s avail;
122 const char * name;
123 s32 rc;
124 u8 log2_page_size;
125 bool source_is_mmap;
126
127 #if BUFFERED_STATISTICS_ON_STDOUT
128 u32 cache_acquired;
129 u32 cache_released;
130 u32 cache_reclaimed;
131 u32 cache_denied;
132 u32 cache_denied_nolock;
133 u32 cache_denied_cost;
134 u32 cache_denied_writefail;
135 #endif
136 };
137
138 typedef struct buffered_file_cache_t_* buffered_file_cache_t;
139
140 struct buffered_file_t_
141 {
142 const struct file_vtbl *vtbl;
143 file_t buffered_file; // the file covered by the cache
144 buffered_file_cache_t page_cache; // the cache used for this file
145 buffered_file_page_set page_set; // the set of pages covering the file (offset -> page)
146 s64 position_current; // the position where the file pointer really is
147 s64 position_requested; // the position where the file pointer is supposed to be
148 s64 size; // the size of the file
149 };
150
151 #include <dnscore/buffered-file.h> // just to match the function signatures
152
153 static void
buffered_file_cache_page_init(buffered_file_page_t page,buffered_file_t f,u8 * buffer,s32 size,int index)154 buffered_file_cache_page_init(buffered_file_page_t page, buffered_file_t f, u8 *buffer, s32 size, int index)
155 {
156 assert((buffer != NULL) && (size > 0));
157
158 page->file = f;
159 page->buffer = buffer;
160 page->position = 0;
161 page->timestamp = 0;
162 page->cost = 0;
163 page->written_from = size;
164 page->written_to_plus_one = 0;
165 page->read_to = 0;
166 page->size = size;
167 page->index = index;
168 page->in_mru = FALSE;
169 }
170
171 static buffered_file_page_t
buffered_file_cache_page_new_instance(u8 * buffer,s32 size,int index)172 buffered_file_cache_page_new_instance(u8 *buffer, s32 size, int index)
173 {
174 buffered_file_page_t page;
175 ZALLOC_OBJECT_OR_DIE(page, struct buffered_file_page, BUFFERED_FILE_PAGE_TAG);
176 buffered_file_cache_page_init(page, NULL, buffer, size, index);
177 return page;
178 }
179
180 static void
buffered_file_cache_page_update(buffered_file_page_t page,buffered_file_t f,s64 position)181 buffered_file_cache_page_update(buffered_file_page_t page, buffered_file_t f, s64 position)
182 {
183 page->file = f;
184 page->position = position;
185 page->timestamp = 0;
186 page->cost = 0;
187 page->written_from = page->size;
188 page->written_to_plus_one = 0;
189 page->read_to = 0;
190 }
191
192 buffered_file_cache_t
buffered_file_cache_new_instance(const char * name,u32 count,u8 log2_granularity,bool use_mmap)193 buffered_file_cache_new_instance(const char* name, u32 count, u8 log2_granularity, bool use_mmap)
194 {
195 if((log2_granularity < 4) || (log2_granularity > 20)) // 16 bytes to 1 MB
196 {
197 return NULL;
198 }
199
200 size_t page_size = 1LLU << log2_granularity;
201 size_t total_size = page_size * count;
202 u8 *pages;
203
204 if(use_mmap)
205 {
206 pages = (u8*)mmap(NULL, total_size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
207
208 if(pages == (u8*)MAP_FAILED)
209 {
210 return NULL;
211 }
212 }
213 else
214 {
215 MALLOC_OBJECT_ARRAY(pages, u8, total_size, BUFFERED_FILE_CACHE_TAG);
216 //pages = (u8*)malloc(total_size);
217
218 if(pages == NULL)
219 {
220 return NULL;
221 }
222 }
223
224 buffered_file_cache_t fc;
225
226 ZALLOC_OBJECT_OR_DIE(fc, struct buffered_file_cache_t_, BUFFERED_FILE_CACHE_TAG);
227
228 #if BUFFERED_STATISTICS_ON_STDOUT
229 memset(fc, 0, sizeof(*fc));
230 #endif
231
232 group_mutex_init(&fc->mtx);
233 fc->page_pool = pages;
234 fc->page_pool_size = total_size;
235 fc->granularity_mask = page_size - 1;
236 list_dl_init(&fc->mru);
237 list_dl_init(&fc->avail);
238 fc->name = strdup(name);
239 fc->source_is_mmap = use_mmap;
240 fc->log2_page_size = log2_granularity;
241 fc->rc = 1;
242
243 for(u32 i = 0; i < count; ++i)
244 {
245 list_dl_append_node(&fc->avail, (list_dl_node_s*)buffered_file_cache_page_new_instance(&pages[page_size * i], page_size, i));
246 }
247
248 #if DEBUG
249 log_info("buffered_file_cache_new_instance('%s', %u, %hhu, %i) = %p", STRNULL(name), count, log2_granularity, use_mmap, fc);
250 #endif
251
252 return fc;
253 }
254
255 static void
buffered_file_cache_acquire(buffered_file_cache_t fc)256 buffered_file_cache_acquire(buffered_file_cache_t fc)
257 {
258 group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
259 ++fc->rc;
260 #if DEBUG
261 log_info("buffered_file_cache_acquire(%p '%s') = %i", fc, STRNULL(fc->name), fc->rc);
262 #endif
263 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
264 }
265
266 static void
buffered_file_cache_release(buffered_file_cache_t fc)267 buffered_file_cache_release(buffered_file_cache_t fc)
268 {
269 group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
270 s32 n = --fc->rc;
271 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
272
273 #if DEBUG
274 log_info("buffered_file_cache_release(%p '%s') = %i", fc, STRNULL(fc->name), n);
275 #endif
276
277 #if BUFFERED_STATISTICS_ON_STDOUT
278 formatln("cache: acquired=%u released=%u reclaimed=%u denied=%u denied-nolock=%u denied-cost=%u denied-writefail=%u",
279 fc->cache_acquired, fc->cache_released, fc->cache_reclaimed, fc->cache_denied,
280 fc->cache_denied_nolock, fc->cache_denied_cost, fc->cache_denied_writefail);
281 flushout();
282 #endif
283
284 if(n == 0)
285 {
286 // the mru should be empty
287 // the avail should be full
288
289 if(fc->source_is_mmap)
290 {
291 munmap(fc->page_pool, fc->page_pool_size);
292 }
293 else
294 {
295 free(fc->page_pool);
296 }
297 }
298 }
299
300 void
buffered_file_cache_delete(buffered_file_cache_t fc)301 buffered_file_cache_delete(buffered_file_cache_t fc)
302 {
303 #if DEBUG
304 group_mutex_lock(&fc->mtx, GROUP_MUTEX_READ);
305 if(fc->rc > 1)
306 {
307 log_warn("buffered_file_cache_delete(%p '%s') rc=%i", fc, STRNULL(fc->name), fc->rc);
308 }
309 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_READ);
310 #endif
311 buffered_file_cache_release(fc);
312 }
313
314 /**
315 * Looks for a victim page among the ones available.
316 * If there are none, tries to take the last page of the MRU from the file using it (flushing it if needed).
317 * This last step can fail if the page is too young based on it's IO cost, or if the page set is locked.
318 *
319 * @param fc
320 *
321 * @return a page or NULL if none were available
322 */
323
324 static buffered_file_page_t
buffered_file_cache_reclaim_page_nolock(buffered_file_cache_t fc)325 buffered_file_cache_reclaim_page_nolock(buffered_file_cache_t fc)
326 {
327 // if the least recently used has been used more than (e.g.) 0.5 seconds ago, try to reclaim it
328 // if it cannot (locked) give up
329 // it it can, flush it, take it from its current user and return it
330
331 buffered_file_page_t page = (buffered_file_page_t)list_dl_last_node(&fc->mru);
332
333 if(page == NULL)
334 {
335 return NULL;
336 }
337
338 if(group_mutex_trylock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE))
339 {
340 s64 now = timeus();
341 #if !IGNORE_COST
342 if(now - page->timestamp > page->cost)
343 #else
344 if(TRUE)
345 #endif
346 {
347 s32 page_content = page->written_to_plus_one - page->written_from;
348
349 if(page_content > 0)
350 {
351 // flush page
352 /*
353 s64 stored_position = page->file->position_current;
354 */
355 s64 write_position = page->position + page->written_from;
356 write_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, write_position, SEEK_SET);
357 if(ISOK(write_position))
358 {
359 page->file->position_current = write_position;
360
361 s32 ret = page->file->buffered_file->vtbl->write(page->file->buffered_file, &page->buffer[page->written_from], page_content);
362 if(ISOK(ret) && (ret == page_content))
363 {
364 page->file->position_current += ret;
365
366 // page flushed
367 u64_set_delete(&page->file->page_set.offset_to_page, page->position);
368
369 if(page->in_mru)
370 {
371 list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
372 page->in_mru = FALSE;
373 }
374
375 #if BUFFERED_STATISTICS_ON_STDOUT
376 ++fc->cache_reclaimed;
377 #endif
378 /*
379 s64 restored_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, stored_position, SEEK_SET);
380 assert(restored_position == stored_position);
381 */
382
383 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
384 }
385 else // could not properly write it
386 {
387 #if BUFFERED_STATISTICS_ON_STDOUT
388 ++fc->cache_denied_writefail;
389 #endif
390 /*
391 s64 restored_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, stored_position, SEEK_SET);
392 assert(restored_position == stored_position);
393 */
394
395 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
396
397 page = NULL;
398 }
399 }
400 else
401 {
402 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
403 page = NULL;
404 }
405 }
406 else
407 {
408 // page does not need to be flushed
409 u64_set_delete(&page->file->page_set.offset_to_page, page->position);
410 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
411 }
412 }
413 else
414 {
415 #if BUFFERED_STATISTICS_ON_STDOUT
416 ++fc->cache_denied_cost;
417 #endif
418 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
419 page = NULL;
420 }
421 }
422 #if BUFFERED_STATISTICS_ON_STDOUT
423 else
424 {
425 ++fc->cache_denied_nolock;
426 }
427 #endif
428
429 return page;
430 }
431
432
433
434
435 /**
436 *
437 * Returns one page set for a file at a given position
438 *
439 * @param fc
440 * @param file
441 * @param position
442 * @return
443 */
444
445 static buffered_file_page_t
buffered_file_cache_acquire_page(buffered_file_cache_t fc,buffered_file_t file,s64 position)446 buffered_file_cache_acquire_page(buffered_file_cache_t fc, buffered_file_t file, s64 position)
447 {
448 buffered_file_page_t page;
449
450 group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
451
452 if(list_dl_size(&fc->avail) == 0)
453 {
454 // find a victim
455
456 if((page = buffered_file_cache_reclaim_page_nolock(fc)) != NULL)
457 {
458 buffered_file_cache_page_update(page, file, position);
459 }
460 }
461 else
462 {
463 // take the first available one
464
465 page = (buffered_file_page_t)list_dl_remove_first_node(&fc->avail);
466 buffered_file_cache_page_update(page, file, position);
467 }
468
469 #if BUFFERED_STATISTICS_ON_STDOUT
470 if(page != NULL)
471 {
472 ++fc->cache_acquired;
473 }
474 else
475 {
476 ++fc->cache_denied;
477 }
478 #endif
479
480 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
481
482 return page;
483 }
484
485 void
buffered_file_cache_release_page(buffered_file_cache_t fc,buffered_file_page_t page)486 buffered_file_cache_release_page(buffered_file_cache_t fc, buffered_file_page_t page)
487 {
488 //u64_set_delete(page->file->page_set.offset_to_page, page->position);
489 group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
490
491 #if BUFFERED_STATISTICS_ON_STDOUT
492 ++fc->cache_released;
493 #endif
494
495 if(page->in_mru)
496 {
497 list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
498 page->in_mru = FALSE;
499 }
500 page->file = NULL;
501 list_dl_insert_node(&fc->avail, (list_dl_node_s*)page);
502 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
503 }
504
505 void
buffered_file_cache_set_page_as_most_recently_used(buffered_file_cache_t fc,buffered_file_page_t page)506 buffered_file_cache_set_page_as_most_recently_used(buffered_file_cache_t fc, buffered_file_page_t page)
507 {
508 group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
509 if(page->in_mru)
510 {
511 list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
512 }
513 list_dl_insert_node(&fc->mru, (list_dl_node_s*)page);
514 page->in_mru = TRUE;
515 group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
516 }
517
518 struct io_range
519 {
520 s64 from;
521 s64 to;
522 buffered_file_page_t page;
523 };
524
525 typedef struct io_range io_range;
526
527
528 static ssize_t
buffered_file_read(file_t f,void * buffer_,ssize_t size)529 buffered_file_read(file_t f, void *buffer_, ssize_t size)
530 {
531 buffered_file_t bf = (buffered_file_t)f;
532
533 if(bf->buffered_file == NULL)
534 {
535 return INVALID_STATE_ERROR;
536 }
537
538 // see if wanted position (which is where we will be reading from) page is in the cache
539 // if not, move to position, acquire the page and update its content with the file
540 // if page acquisition is not possible, do a direct read
541 // from the page, copy the bytes to the buffer
542
543 u8 *buffer = (u8*)buffer_;
544 u8* buffer_org = buffer;
545
546 buffered_file_page_t page = NULL;
547
548 s64 page_position = bf->position_requested & ~bf->page_cache->granularity_mask;
549 s64 in_page_from = bf->position_requested & bf->page_cache->granularity_mask;
550
551 // we know the page_position and the position in the page
552
553 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
554
555 for(;;) // until there are no more bytes to read
556 {
557 // see if the page is cached
558
559 u64_node *node = u64_set_find(&bf->page_set.offset_to_page, page_position);
560
561 if(node == NULL)
562 {
563 // the page is not cached
564
565 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
566
567 // acquire a page to use for caching
568
569 page = buffered_file_cache_acquire_page(bf->page_cache, bf, page_position);
570
571 if(page == NULL)
572 {
573 // case where the cache is over used so taking a page would be counter-productive by making the cache a glorified intermediary buffer
574
575 if(bf->position_current != bf->position_requested)
576 {
577 // move into the file at the requested position
578
579 ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
580
581 if(FAIL(ret))
582 {
583 ssize_t total = buffer - buffer_org;
584
585 if(total > 0)
586 {
587 ret = total;
588 }
589
590 return ret;
591 }
592
593 bf->position_current = ret;
594 }
595
596 // read until the next cached page
597
598 s64 to_read = size;
599 s64 next_page_position = page_position;
600
601 for(;;)
602 {
603 next_page_position += bf->page_cache->granularity_mask + 1;
604
605 if(next_page_position >= bf->position_current + size)
606 {
607 break;
608 }
609
610 if(u64_set_find(&bf->page_set.offset_to_page, next_page_position) != NULL)
611 {
612 // a page exists at that position
613 to_read = next_page_position - bf->position_current;
614 break;
615 }
616 }
617
618 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, buffer, to_read);
619
620 if(ISOK(ret))
621 {
622 bf->position_current += ret;
623 bf->position_requested = bf->position_current;
624 buffer += ret;
625 size -= ret;
626
627 if((size == 0) && (ret == to_read))
628 {
629 ssize_t total = buffer - buffer_org;
630
631 return total;
632 }
633
634 page_position = next_page_position;
635 in_page_from = 0;
636
637 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
638
639 continue;
640 }
641 else
642 {
643 // handle the error so read bytes are not ignored
644
645 ssize_t total = buffer - buffer_org;
646 if(total > 0)
647 {
648 ret = total;
649 }
650
651 return ret;
652 }
653 }
654
655 s64 cost_computation_begin = timeus();
656
657 s64 end_avail = bf->size - page_position;
658
659 if(end_avail > 0)
660 {
661 if(bf->position_current != page_position)
662 {
663 // move into the file at the requested position
664
665 ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, page_position, SEEK_SET);
666
667 if(FAIL(ret))
668 {
669 buffered_file_cache_release_page(bf->page_cache, page);
670
671 ssize_t total = buffer - buffer_org;
672
673 if(total > 0)
674 {
675 ret = total;
676 }
677
678 return ret;
679 }
680
681 bf->position_current = ret;
682 }
683
684 // fill the page
685
686 size_t expected_size = MIN(end_avail, page->size);
687
688 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, page->buffer, expected_size);
689
690 if(FAIL(ret))
691 {
692 buffered_file_cache_release_page(bf->page_cache, page);
693
694 ssize_t total = buffer - buffer_org;
695
696 if(total > 0)
697 {
698 ret = total;
699 }
700
701 return ret;
702 }
703
704 bf->position_current += ret;
705 page->read_to = ret;
706 }
707 else
708 {
709 page->read_to = 0;
710 }
711
712 s64 cost_computation_end = timeus();
713
714 // update the cost
715
716 page->cost = (cost_computation_end - cost_computation_begin) * 2;
717
718 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
719
720 page->timestamp = MAX_S64;
721 u64_node *page_node = u64_set_insert(&bf->page_set.offset_to_page, page->position);
722 page_node->value = page;
723 }
724 else
725 {
726 page = (buffered_file_page_t)node->value;
727 }
728
729 // copy from the page
730
731 ssize_t available_in_page = page->read_to - in_page_from;
732
733 ssize_t n = MIN(available_in_page, size);
734 memcpy(buffer, &page->buffer[in_page_from], n);
735
736 bf->position_requested += n;
737 page->timestamp = timeus();
738
739 size -= n;
740
741 // move the page at the top of the MRU
742
743 buffered_file_cache_set_page_as_most_recently_used(bf->page_cache, page);
744
745 buffer += n;
746
747 // if size == 0 the job is done
748 // if page->read_to != page->size this is the end of the file
749
750 if((size == 0) || (page->read_to != page->size))
751 {
752 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
753 break;
754 }
755
756 // at this point, bf->position_requested & bf->page_cache->granularity_mask should be 0
757
758 assert((bf->position_requested & bf->page_cache->granularity_mask) == 0);
759
760 page_position += page->size;
761 in_page_from = 0;
762 page = NULL;
763 }
764
765 ssize_t total = buffer - buffer_org;
766
767 return total;
768 }
769
770 static ssize_t
buffered_file_write(file_t f,const void * buffer_,ssize_t size)771 buffered_file_write(file_t f, const void *buffer_, ssize_t size)
772 {
773 buffered_file_t bf = (buffered_file_t)f;
774
775 if(bf->buffered_file == NULL)
776 {
777 return INVALID_STATE_ERROR;
778 }
779
780 // see if wanted position (which is where we will be reading from) page is in the cache
781 // if not, move to position, acquire the page and update its content with the file (if the position is outside the file, just fill with zeroes)
782 // if page acquisition is not possible, do a direct write
783 // from the buffer, copy the bytes to the page and update the written range
784
785 const u8 *buffer = (const u8*)buffer_;
786 const u8* buffer_org = buffer;
787
788 buffered_file_page_t page = NULL;
789
790 s64 page_position = bf->position_requested & ~bf->page_cache->granularity_mask;
791 s64 in_page_from = bf->position_requested & bf->page_cache->granularity_mask;
792
793 // we know the page_position and the position in the page
794
795 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
796
797 for(;;) // until there are no more bytes to read
798 {
799 // see if the page is cached
800
801 u64_node *node = u64_set_find(&bf->page_set.offset_to_page, page_position);
802
803 if(node == NULL)
804 {
805 // the page is not cached
806
807 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
808
809 // acquire a page to use for caching
810
811 page = buffered_file_cache_acquire_page(bf->page_cache, bf, page_position);
812
813 if(page == NULL)
814 {
815 // case where the cache is over used so taking a page would be counter-productive by making the cache a glorified intermediary buffer
816
817 if(bf->position_current != bf->position_requested)
818 {
819 // move into the file at the requested position
820
821 ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
822
823 if(FAIL(ret))
824 {
825 buffered_file_cache_release_page(bf->page_cache, page);
826
827 if(bf->position_requested > bf->size)
828 {
829 bf->size = bf->position_requested;
830 }
831
832 ssize_t total = buffer - buffer_org;
833
834 if(total > 0)
835 {
836 ret = total;
837 }
838
839 return ret;
840 }
841
842 bf->position_current = ret;
843 }
844
845 // write until the next cached page
846
847 s64 to_write = size;
848 s64 next_page_position = page_position;
849
850 for(;;)
851 {
852 next_page_position += bf->page_cache->granularity_mask + 1;
853
854 if(next_page_position >= bf->position_current + size)
855 {
856 break;
857 }
858
859 if(u64_set_find(&bf->page_set.offset_to_page, next_page_position) != NULL)
860 {
861 // a page exists at that position
862 to_write = next_page_position - bf->position_current;
863 break;
864 }
865 }
866
867 ssize_t ret = bf->buffered_file->vtbl->write(bf->buffered_file, buffer, to_write);
868
869 if(ISOK(ret))
870 {
871 bf->position_current += ret;
872 bf->position_requested = bf->position_current;
873 buffer += ret;
874 size -= ret;
875
876 if(bf->position_requested > bf->size)
877 {
878 bf->size = bf->position_requested;
879 }
880
881 if((size == 0) && (ret == to_write))
882 {
883 ssize_t total = buffer - buffer_org;
884
885 return total;
886 }
887
888 page_position = next_page_position;
889 in_page_from = 0;
890
891 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
892
893 continue;
894 }
895 else
896 {
897 // handle the error so read bytes are not ignored
898
899 if(bf->position_current > bf->size)
900 {
901 bf->size = bf->position_current;
902 }
903
904 ssize_t total = buffer - buffer_org;
905 if(total > 0)
906 {
907 ret = total;
908 }
909
910 return ret;
911 }
912 }
913
914 s64 cost_computation_begin = timeus();
915
916 s64 end_avail = bf->size - page_position;
917
918 if(end_avail > 0)
919 {
920 if(bf->position_current != page_position)
921 {
922 // move into the file at the requested position
923
924 ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, page_position, SEEK_SET);
925
926 if(FAIL(ret))
927 {
928 if(bf->position_current > bf->size)
929 {
930 bf->size = bf->position_current;
931 }
932
933 buffered_file_cache_release_page(bf->page_cache, page);
934
935 ssize_t total = buffer - buffer_org;
936
937 if(total > 0)
938 {
939 ret = total;
940 }
941
942 return ret;
943 }
944
945 bf->position_current = ret;
946 }
947
948 // fill the page
949
950 size_t expected_size = MIN(end_avail, page->size);
951
952 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, page->buffer, expected_size);
953
954 if(FAIL(ret))
955 {
956 if(bf->position_current > bf->size)
957 {
958 bf->size = bf->position_current;
959 }
960
961 buffered_file_cache_release_page(bf->page_cache, page);
962
963 ssize_t total = buffer - buffer_org;
964
965 if(total > 0)
966 {
967 ret = total;
968 }
969
970 return ret;
971 }
972
973 page->read_to = ret;
974 }
975 else
976 {
977 page->read_to = 0;
978 }
979
980 s64 cost_computation_end = timeus();
981
982 // update the cost
983
984 page->cost = (cost_computation_end - cost_computation_begin) * 2;
985
986 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
987
988 page->timestamp = MAX_S64;
989 u64_node *page_node = u64_set_insert(&bf->page_set.offset_to_page, page->position);
990 page_node->value = page;
991 }
992 else
993 {
994 page = (buffered_file_page_t)node->value;
995 }
996
997 // copy from the page
998
999 ssize_t available_in_page = page->size - in_page_from; // it's size and not read_to
1000
1001 ssize_t n = MIN(available_in_page, size);
1002 memcpy(&page->buffer[in_page_from], buffer, n);
1003
1004 if(page->written_from > in_page_from)
1005 {
1006 page->written_from = in_page_from;
1007 }
1008
1009 if(page->written_to_plus_one < in_page_from + n)
1010 {
1011 page->written_to_plus_one = in_page_from + n;
1012 }
1013
1014 if(page->read_to < in_page_from + n)
1015 {
1016 page->read_to = in_page_from + n;
1017 }
1018
1019 bf->position_requested += n;
1020 page->timestamp = timeus();
1021
1022 size -= n;
1023
1024 // move the page at the top of the MRU
1025
1026 buffered_file_cache_set_page_as_most_recently_used(bf->page_cache, page);
1027
1028 buffer += n;
1029
1030 // if size == 0 the job is done
1031 // if page->read_to != page->size this is the end of the file
1032
1033 if((size == 0) || (page->read_to != page->size))
1034 {
1035 if(bf->position_requested > bf->size)
1036 {
1037 bf->size = bf->position_requested;
1038 }
1039
1040 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1041 break;
1042 }
1043
1044 // at this point, bf->position_requested & bf->page_cache->granularity_mask should be 0
1045
1046 assert((bf->position_requested & bf->page_cache->granularity_mask) == 0);
1047
1048 page_position += page->size;
1049 in_page_from = 0;
1050 page = NULL;
1051 }
1052
1053 ssize_t total = buffer - buffer_org;
1054
1055 return total;
1056 }
1057
1058 static ssize_t
buffered_file_seek(file_t f,ssize_t position,int whence)1059 buffered_file_seek(file_t f, ssize_t position, int whence)
1060 {
1061 buffered_file_t bf = (buffered_file_t)f;
1062
1063 if(bf->buffered_file == NULL)
1064 {
1065 return INVALID_STATE_ERROR;
1066 }
1067
1068 switch(whence)
1069 {
1070 case SEEK_SET:
1071 {
1072 bf->position_requested = position;
1073 return position;
1074 }
1075 case SEEK_CUR:
1076 {
1077 if(bf->position_requested + position >= 0)
1078 {
1079 bf->position_requested += position;
1080 return bf->position_requested;
1081 }
1082 else
1083 {
1084 bf->position_requested = 0;
1085 return 0;
1086 }
1087 }
1088 case SEEK_END:
1089 {
1090 if(bf->size + position >= 0)
1091 {
1092 bf->position_requested = bf->size + position;
1093 return bf->position_requested;
1094 }
1095 else
1096 {
1097 bf->position_requested = 0;
1098 return 0;
1099 }
1100 }
1101 default:
1102 {
1103 return INVALID_ARGUMENT_ERROR;
1104 }
1105 }
1106 }
1107
1108 static ssize_t
buffered_file_tell(file_t f)1109 buffered_file_tell(file_t f)
1110 {
1111 buffered_file_t bf = (buffered_file_t)f;
1112
1113 if(bf->buffered_file == NULL)
1114 {
1115 return INVALID_STATE_ERROR;
1116 }
1117
1118 return bf->position_requested;
1119 }
1120
1121 static ya_result
buffered_file_flush(file_t f)1122 buffered_file_flush(file_t f)
1123 {
1124 buffered_file_t bf = (buffered_file_t)f;
1125
1126 if(bf->buffered_file == NULL)
1127 {
1128 return INVALID_STATE_ERROR;
1129 }
1130
1131 ssize_t ret = 0;
1132 bool moved = FALSE;
1133
1134 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1135
1136 u64_set_iterator iter;
1137 u64_set_iterator_init(&bf->page_set.offset_to_page, &iter);
1138 while(u64_set_iterator_hasnext(&iter))
1139 {
1140 u64_node *node = u64_set_iterator_next_node(&iter);
1141 buffered_file_page* page = (buffered_file_page*)node->value;
1142 if(page->written_from <= page->written_to_plus_one)
1143 {
1144 // move at the position
1145 // write the bytes
1146
1147 ssize_t target = page->position + page->written_from;
1148 ret = bf->buffered_file->vtbl->seek(bf->buffered_file, target, SEEK_SET);
1149 if(ret != target)
1150 {
1151 if(ret >= 0) // the returned value does not match the expected position
1152 {
1153 ret = INVALID_STATE_ERROR;
1154 }
1155
1156 break;
1157 }
1158
1159 size_t size = (size_t)page->written_to_plus_one - page->written_from;
1160
1161 ret = bf->buffered_file->vtbl->write(bf->buffered_file, &page->buffer[page->written_from], size);
1162
1163 if(ret != (ssize_t)size)
1164 {
1165 if(ret >= 0) // the number of bytes do not match the expected written amount
1166 {
1167 ret = INVALID_STATE_ERROR;
1168 }
1169
1170 break;
1171 }
1172
1173 page->written_from = size;
1174 page->written_to_plus_one = 0;
1175
1176 moved = TRUE;
1177 }
1178 }
1179
1180 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1181
1182 if(moved)
1183 {
1184 bf->position_current = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
1185 }
1186
1187 return ret;
1188 }
1189
1190 static int
buffered_file_close(file_t f)1191 buffered_file_close(file_t f)
1192 {
1193 buffered_file_t bf = (buffered_file_t)f;
1194
1195 if(bf->buffered_file == NULL)
1196 {
1197 return INVALID_STATE_ERROR;
1198 }
1199
1200 buffered_file_flush(f);
1201
1202 group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1203
1204 u64_set_iterator iter;
1205 u64_set_iterator_init(&bf->page_set.offset_to_page, &iter);
1206 while(u64_set_iterator_hasnext(&iter))
1207 {
1208 u64_node *node = u64_set_iterator_next_node(&iter);
1209 buffered_file_page* page = (buffered_file_page*)node->value;
1210
1211 buffered_file_cache_release_page(bf->page_cache, page);
1212 }
1213 u64_set_destroy(&bf->page_set.offset_to_page);
1214
1215 group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1216
1217 bf->buffered_file->vtbl->close(bf->buffered_file);
1218
1219 buffered_file_cache_release(bf->page_cache);
1220
1221 bf->page_cache = NULL;
1222 bf->vtbl = NULL;
1223 ZFREE_OBJECT(bf);
1224 return SUCCESS;
1225 }
1226
1227 static ssize_t
buffered_file_size(file_t f)1228 buffered_file_size(file_t f)
1229 {
1230 buffered_file_t bf = (buffered_file_t)f;
1231
1232 if(bf->buffered_file == NULL)
1233 {
1234 return INVALID_STATE_ERROR;
1235 }
1236
1237 return bf->size;
1238 }
1239
1240 static int
buffered_file_resize(file_t f,ssize_t size)1241 buffered_file_resize(file_t f, ssize_t size)
1242 {
1243 buffered_file_t bf = (buffered_file_t)f;
1244
1245 if(bf->buffered_file == NULL)
1246 {
1247 return INVALID_STATE_ERROR;
1248 }
1249
1250 if(size < 0)
1251 {
1252 return INVALID_ARGUMENT_ERROR;
1253 }
1254
1255 int ret = file_resize(bf->buffered_file, size);
1256
1257 if(ret >= 0)
1258 {
1259 bf->size = size;
1260 }
1261
1262 return ret;
1263 }
1264
1265 static const struct file_vtbl buffered_file_vtbl =
1266 {
1267 buffered_file_read,
1268 buffered_file_write,
1269 buffered_file_seek,
1270 buffered_file_tell,
1271 buffered_file_flush,
1272 buffered_file_close,
1273 buffered_file_size,
1274 buffered_file_resize
1275 };
1276
1277 ya_result
buffered_file_init(file_t * fp,file_t file_to_buffer_,buffered_file_cache_t fc)1278 buffered_file_init(file_t *fp, file_t file_to_buffer_, buffered_file_cache_t fc)
1279 {
1280 if(fp == NULL || file_to_buffer_ == NULL || fc == NULL)
1281 {
1282 return UNEXPECTED_NULL_ARGUMENT_ERROR;
1283 }
1284
1285 file_t file_to_buffer = file_to_buffer_;
1286
1287 buffered_file_t bf;
1288 ZALLOC_OBJECT_OR_DIE(bf,struct buffered_file_t_, BUFFERED_FILE_TAG);
1289 bf->vtbl = &buffered_file_vtbl;
1290 bf->buffered_file = file_to_buffer;
1291 buffered_file_cache_acquire(fc);
1292 bf->page_cache = fc;
1293 group_mutex_init(&bf->page_set.mtx);
1294 bf->page_set.offset_to_page = NULL;
1295 bf->position_current = file_to_buffer->vtbl->tell(file_to_buffer);
1296 bf->position_requested = bf->position_current;
1297 bf->size = file_to_buffer->vtbl->size(file_to_buffer);
1298
1299 *fp = (file_t)bf;
1300
1301 return SUCCESS;
1302 }
1303
1304 /** @} */
1305