1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup streaming Streams
36  *  @ingroup dnscore
37  *  @brief
38  *
39  *
40  *
41  * @{
42  *
43  *----------------------------------------------------------------------------*/
44 
45 #define __BUFFERED_FILE_C__ 1
46 
47 #include "dnscore/dnscore-config.h"
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <sys/types.h>
51 #include <sys/stat.h>
52 #include <sys/mman.h>
53 #include <fcntl.h>
54 #include <unistd.h>
55 #include "dnscore/ptr_set.h"
56 #include "dnscore/mutex.h"
57 #include "dnscore/list-dl.h"
58 #include "dnscore/fdtools.h"
59 #include "dnscore/logger.h"
60 #include "dnscore/file.h"
61 #include "dnscore/u64_set.h"
62 #include "dnscore/timems.h"
63 
64 #define BUFFERED_STATISTICS_ON_STDOUT 0
65 #define IGNORE_COST 0
66 
67 #if BUFFERED_STATISTICS_ON_STDOUT
68 #include "dnscore/format.h"
69 #endif
70 
71 #define MODULE_MSG_HANDLE g_system_logger
72 
73 #define BUFFERED_FILE_TAG           0x454c4946524642    // BFRFILE_TAG
74 #define BUFFERED_FILE_PAGE_TAG     0x50454c4946524642   // BFRFILEP_TAG
75 #define BUFFERED_FILE_CACHE_TAG     0x43454c4946524642   // BFRFILEC_TAG
76 
77 #define PAGE_USE_OLD_ENOUGH_VALUE_US 500000LL // 0.5s
78 
79 struct buffered_file_t_;
80 
81 typedef struct buffered_file_t_* buffered_file_t;
82 
83 struct buffered_file_page
84 {
85     struct list_dl_node_s *next;
86     struct list_dl_node_s *prev;
87 
88     buffered_file_t file;
89     u8 *buffer;
90     s64 position;   // position of the page in the file (multiple of page granularity)
91     s64 timestamp;  // last use (so it does fnot exit the MRU too fast except to be given to the same id)
92     s64 cost;       // the time in us that was required to build that page (IOs)
93     s32 size;
94     s32 written_from; // if written_from <= written_to then the page needs to be written
95     s32 written_to_plus_one;
96     s32 read_to;    // number of bytes in that page (usually = size unless it's the last one of the file)
97     int index;      // for sorting
98     bool in_mru;
99 };
100 
101 typedef struct buffered_file_page buffered_file_page;
102 
103 typedef struct buffered_file_page* buffered_file_page_t;
104 
105 struct buffered_file_page_set
106 {
107     group_mutex_t mtx;
108     u64_set offset_to_page;
109 };
110 
111 typedef struct buffered_file_page_set buffered_file_page_set;
112 
113 struct buffered_file_cache_t_
114 {
115     group_mutex_t mtx;
116     ptr_set id_to_page_set;
117     void *page_pool;
118     size_t page_pool_size;
119     s64 granularity_mask;
120     list_dl_s mru;
121     list_dl_s avail;
122     const char * name;
123     s32 rc;
124     u8 log2_page_size;
125     bool source_is_mmap;
126 
127 #if BUFFERED_STATISTICS_ON_STDOUT
128     u32 cache_acquired;
129     u32 cache_released;
130     u32 cache_reclaimed;
131     u32 cache_denied;
132     u32 cache_denied_nolock;
133     u32 cache_denied_cost;
134     u32 cache_denied_writefail;
135 #endif
136 };
137 
138 typedef struct buffered_file_cache_t_* buffered_file_cache_t;
139 
140 struct buffered_file_t_
141 {
142     const struct file_vtbl *vtbl;
143     file_t buffered_file;               // the file covered by the cache
144     buffered_file_cache_t page_cache;            // the cache used for this file
145     buffered_file_page_set page_set;    // the set of pages covering the file (offset -> page)
146     s64 position_current;               // the position where the file pointer really is
147     s64 position_requested;             // the position where the file pointer is supposed to be
148     s64 size;                           // the size of the file
149 };
150 
151 #include <dnscore/buffered-file.h>      // just to match the function signatures
152 
153 static void
buffered_file_cache_page_init(buffered_file_page_t page,buffered_file_t f,u8 * buffer,s32 size,int index)154 buffered_file_cache_page_init(buffered_file_page_t page, buffered_file_t f, u8 *buffer, s32 size, int index)
155 {
156     assert((buffer != NULL) && (size > 0));
157 
158     page->file = f;
159     page->buffer = buffer;
160     page->position = 0;
161     page->timestamp = 0;
162     page->cost = 0;
163     page->written_from = size;
164     page->written_to_plus_one = 0;
165     page->read_to = 0;
166     page->size = size;
167     page->index = index;
168     page->in_mru = FALSE;
169 }
170 
171 static buffered_file_page_t
buffered_file_cache_page_new_instance(u8 * buffer,s32 size,int index)172 buffered_file_cache_page_new_instance(u8 *buffer, s32 size, int index)
173 {
174     buffered_file_page_t page;
175     ZALLOC_OBJECT_OR_DIE(page, struct buffered_file_page, BUFFERED_FILE_PAGE_TAG);
176     buffered_file_cache_page_init(page, NULL, buffer, size, index);
177     return page;
178 }
179 
180 static void
buffered_file_cache_page_update(buffered_file_page_t page,buffered_file_t f,s64 position)181 buffered_file_cache_page_update(buffered_file_page_t page, buffered_file_t f, s64 position)
182 {
183     page->file = f;
184     page->position = position;
185     page->timestamp = 0;
186     page->cost = 0;
187     page->written_from = page->size;
188     page->written_to_plus_one = 0;
189     page->read_to = 0;
190 }
191 
192 buffered_file_cache_t
buffered_file_cache_new_instance(const char * name,u32 count,u8 log2_granularity,bool use_mmap)193 buffered_file_cache_new_instance(const char* name, u32 count, u8 log2_granularity, bool use_mmap)
194 {
195     if((log2_granularity < 4) || (log2_granularity > 20)) // 16 bytes to 1 MB
196     {
197         return NULL;
198     }
199 
200     size_t page_size = 1LLU << log2_granularity;
201     size_t total_size = page_size * count;
202     u8 *pages;
203 
204     if(use_mmap)
205     {
206         pages = (u8*)mmap(NULL, total_size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
207 
208         if(pages == (u8*)MAP_FAILED)
209         {
210             return NULL;
211         }
212     }
213     else
214     {
215         MALLOC_OBJECT_ARRAY(pages, u8, total_size, BUFFERED_FILE_CACHE_TAG);
216         //pages = (u8*)malloc(total_size);
217 
218         if(pages == NULL)
219         {
220             return NULL;
221         }
222     }
223 
224     buffered_file_cache_t fc;
225 
226     ZALLOC_OBJECT_OR_DIE(fc, struct buffered_file_cache_t_, BUFFERED_FILE_CACHE_TAG);
227 
228 #if BUFFERED_STATISTICS_ON_STDOUT
229     memset(fc, 0, sizeof(*fc));
230 #endif
231 
232     group_mutex_init(&fc->mtx);
233     fc->page_pool = pages;
234     fc->page_pool_size = total_size;
235     fc->granularity_mask = page_size - 1;
236     list_dl_init(&fc->mru);
237     list_dl_init(&fc->avail);
238     fc->name = strdup(name);
239     fc->source_is_mmap = use_mmap;
240     fc->log2_page_size = log2_granularity;
241     fc->rc = 1;
242 
243     for(u32 i = 0; i < count; ++i)
244     {
245         list_dl_append_node(&fc->avail, (list_dl_node_s*)buffered_file_cache_page_new_instance(&pages[page_size * i], page_size, i));
246     }
247 
248 #if DEBUG
249     log_info("buffered_file_cache_new_instance('%s', %u, %hhu, %i) = %p", STRNULL(name), count, log2_granularity, use_mmap, fc);
250 #endif
251 
252     return fc;
253 }
254 
255 static void
buffered_file_cache_acquire(buffered_file_cache_t fc)256 buffered_file_cache_acquire(buffered_file_cache_t fc)
257 {
258     group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
259     ++fc->rc;
260 #if DEBUG
261     log_info("buffered_file_cache_acquire(%p '%s') = %i", fc, STRNULL(fc->name), fc->rc);
262 #endif
263     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
264 }
265 
266 static void
buffered_file_cache_release(buffered_file_cache_t fc)267 buffered_file_cache_release(buffered_file_cache_t fc)
268 {
269     group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
270     s32 n = --fc->rc;
271     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
272 
273 #if DEBUG
274     log_info("buffered_file_cache_release(%p '%s') = %i", fc, STRNULL(fc->name), n);
275 #endif
276 
277 #if BUFFERED_STATISTICS_ON_STDOUT
278     formatln("cache: acquired=%u released=%u reclaimed=%u denied=%u denied-nolock=%u denied-cost=%u denied-writefail=%u",
279             fc->cache_acquired, fc->cache_released, fc->cache_reclaimed, fc->cache_denied,
280             fc->cache_denied_nolock, fc->cache_denied_cost, fc->cache_denied_writefail);
281     flushout();
282 #endif
283 
284     if(n == 0)
285     {
286         // the mru should be empty
287         // the avail should be full
288 
289         if(fc->source_is_mmap)
290         {
291             munmap(fc->page_pool, fc->page_pool_size);
292         }
293         else
294         {
295             free(fc->page_pool);
296         }
297     }
298 }
299 
300 void
buffered_file_cache_delete(buffered_file_cache_t fc)301 buffered_file_cache_delete(buffered_file_cache_t fc)
302 {
303 #if DEBUG
304     group_mutex_lock(&fc->mtx, GROUP_MUTEX_READ);
305     if(fc->rc > 1)
306     {
307         log_warn("buffered_file_cache_delete(%p '%s') rc=%i", fc, STRNULL(fc->name), fc->rc);
308     }
309     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_READ);
310 #endif
311     buffered_file_cache_release(fc);
312 }
313 
314 /**
315  * Looks for a victim page among the ones available.
316  * If there are none, tries to take the last page of the MRU from the file using it (flushing it if needed).
317  * This last step can fail if the page is too young based on it's IO cost, or if the page set is locked.
318  *
319  * @param fc
320  *
321  * @return a page or NULL if none were available
322  */
323 
324 static buffered_file_page_t
buffered_file_cache_reclaim_page_nolock(buffered_file_cache_t fc)325 buffered_file_cache_reclaim_page_nolock(buffered_file_cache_t fc)
326 {
327     // if the least recently used has been used more than (e.g.) 0.5 seconds ago, try to reclaim it
328     // if it cannot (locked) give up
329     // it it can, flush it, take it from its current user and return it
330 
331     buffered_file_page_t page = (buffered_file_page_t)list_dl_last_node(&fc->mru);
332 
333     if(page == NULL)
334     {
335         return NULL;
336     }
337 
338     if(group_mutex_trylock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE))
339     {
340         s64 now = timeus();
341 #if !IGNORE_COST
342         if(now - page->timestamp > page->cost)
343 #else
344         if(TRUE)
345 #endif
346         {
347             s32 page_content = page->written_to_plus_one - page->written_from;
348 
349             if(page_content > 0)
350             {
351                 // flush page
352                 /*
353                 s64 stored_position = page->file->position_current;
354                 */
355                 s64 write_position = page->position + page->written_from;
356                 write_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, write_position, SEEK_SET);
357                 if(ISOK(write_position))
358                 {
359                     page->file->position_current = write_position;
360 
361                     s32 ret = page->file->buffered_file->vtbl->write(page->file->buffered_file, &page->buffer[page->written_from], page_content);
362                     if(ISOK(ret) && (ret == page_content))
363                     {
364                         page->file->position_current += ret;
365 
366                         // page flushed
367                         u64_set_delete(&page->file->page_set.offset_to_page, page->position);
368 
369                         if(page->in_mru)
370                         {
371                             list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
372                             page->in_mru = FALSE;
373                         }
374 
375 #if BUFFERED_STATISTICS_ON_STDOUT
376                         ++fc->cache_reclaimed;
377 #endif
378                         /*
379                         s64 restored_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, stored_position, SEEK_SET);
380                         assert(restored_position == stored_position);
381                         */
382 
383                         group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
384                     }
385                     else // could not properly write it
386                     {
387 #if BUFFERED_STATISTICS_ON_STDOUT
388                         ++fc->cache_denied_writefail;
389 #endif
390                         /*
391                         s64 restored_position = page->file->buffered_file->vtbl->seek(page->file->buffered_file, stored_position, SEEK_SET);
392                         assert(restored_position == stored_position);
393                         */
394 
395                         group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
396 
397                         page = NULL;
398                     }
399                 }
400                 else
401                 {
402                     group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
403                     page = NULL;
404                 }
405             }
406             else
407             {
408                 // page does not need to be flushed
409                 u64_set_delete(&page->file->page_set.offset_to_page, page->position);
410                 group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
411             }
412         }
413         else
414         {
415 #if BUFFERED_STATISTICS_ON_STDOUT
416             ++fc->cache_denied_cost;
417 #endif
418             group_mutex_unlock(&page->file->page_set.mtx, GROUP_MUTEX_WRITE);
419             page = NULL;
420         }
421     }
422 #if BUFFERED_STATISTICS_ON_STDOUT
423     else
424     {
425         ++fc->cache_denied_nolock;
426     }
427 #endif
428 
429     return page;
430 }
431 
432 
433 
434 
435 /**
436  *
437  * Returns one page set for a file at a given position
438  *
439  * @param fc
440  * @param file
441  * @param position
442  * @return
443  */
444 
445 static buffered_file_page_t
buffered_file_cache_acquire_page(buffered_file_cache_t fc,buffered_file_t file,s64 position)446 buffered_file_cache_acquire_page(buffered_file_cache_t fc, buffered_file_t file, s64 position)
447 {
448     buffered_file_page_t page;
449 
450     group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
451 
452     if(list_dl_size(&fc->avail) == 0)
453     {
454         // find a victim
455 
456         if((page = buffered_file_cache_reclaim_page_nolock(fc)) != NULL)
457         {
458             buffered_file_cache_page_update(page, file, position);
459         }
460     }
461     else
462     {
463         // take the first available one
464 
465         page = (buffered_file_page_t)list_dl_remove_first_node(&fc->avail);
466         buffered_file_cache_page_update(page, file, position);
467     }
468 
469 #if BUFFERED_STATISTICS_ON_STDOUT
470     if(page != NULL)
471     {
472         ++fc->cache_acquired;
473     }
474     else
475     {
476         ++fc->cache_denied;
477     }
478 #endif
479 
480     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
481 
482     return page;
483 }
484 
485 void
buffered_file_cache_release_page(buffered_file_cache_t fc,buffered_file_page_t page)486 buffered_file_cache_release_page(buffered_file_cache_t fc, buffered_file_page_t page)
487 {
488     //u64_set_delete(page->file->page_set.offset_to_page, page->position);
489     group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
490 
491 #if BUFFERED_STATISTICS_ON_STDOUT
492     ++fc->cache_released;
493 #endif
494 
495     if(page->in_mru)
496     {
497         list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
498         page->in_mru = FALSE;
499     }
500     page->file = NULL;
501     list_dl_insert_node(&fc->avail, (list_dl_node_s*)page);
502     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
503 }
504 
505 void
buffered_file_cache_set_page_as_most_recently_used(buffered_file_cache_t fc,buffered_file_page_t page)506 buffered_file_cache_set_page_as_most_recently_used(buffered_file_cache_t fc, buffered_file_page_t page)
507 {
508     group_mutex_lock(&fc->mtx, GROUP_MUTEX_WRITE);
509     if(page->in_mru)
510     {
511         list_dl_remove_node(&fc->mru, (list_dl_node_s*)page);
512     }
513     list_dl_insert_node(&fc->mru, (list_dl_node_s*)page);
514     page->in_mru = TRUE;
515     group_mutex_unlock(&fc->mtx, GROUP_MUTEX_WRITE);
516 }
517 
518 struct io_range
519 {
520     s64 from;
521     s64 to;
522     buffered_file_page_t page;
523 };
524 
525 typedef struct io_range io_range;
526 
527 
528 static ssize_t
buffered_file_read(file_t f,void * buffer_,ssize_t size)529 buffered_file_read(file_t f, void *buffer_, ssize_t size)
530 {
531     buffered_file_t bf = (buffered_file_t)f;
532 
533     if(bf->buffered_file == NULL)
534     {
535         return INVALID_STATE_ERROR;
536     }
537 
538     // see if wanted position (which is where we will be reading from) page is in the cache
539     // if not, move to position, acquire the page and update its content with the file
540     //   if page acquisition is not possible, do a direct read
541     // from the page, copy the bytes to the buffer
542 
543     u8 *buffer = (u8*)buffer_;
544     u8* buffer_org = buffer;
545 
546     buffered_file_page_t page = NULL;
547 
548     s64 page_position = bf->position_requested & ~bf->page_cache->granularity_mask;
549     s64 in_page_from = bf->position_requested & bf->page_cache->granularity_mask;
550 
551     // we know the page_position and the position in the page
552 
553     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
554 
555     for(;;) // until there are no more bytes to read
556     {
557         // see if the page is cached
558 
559         u64_node *node = u64_set_find(&bf->page_set.offset_to_page, page_position);
560 
561         if(node == NULL)
562         {
563             // the page is not cached
564 
565             group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
566 
567             // acquire a page to use for caching
568 
569             page = buffered_file_cache_acquire_page(bf->page_cache, bf, page_position);
570 
571             if(page == NULL)
572             {
573                 // case where the cache is over used so taking a page would be counter-productive by making the cache a glorified intermediary buffer
574 
575                 if(bf->position_current != bf->position_requested)
576                 {
577                     // move into the file at the requested position
578 
579                     ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
580 
581                     if(FAIL(ret))
582                     {
583                         ssize_t total = buffer - buffer_org;
584 
585                         if(total > 0)
586                         {
587                             ret = total;
588                         }
589 
590                         return ret;
591                     }
592 
593                     bf->position_current = ret;
594                 }
595 
596                 // read until the next cached page
597 
598                 s64 to_read = size;
599                 s64 next_page_position = page_position;
600 
601                 for(;;)
602                 {
603                     next_page_position += bf->page_cache->granularity_mask + 1;
604 
605                     if(next_page_position >= bf->position_current + size)
606                     {
607                         break;
608                     }
609 
610                     if(u64_set_find(&bf->page_set.offset_to_page, next_page_position) != NULL)
611                     {
612                         // a page exists at that position
613                         to_read = next_page_position - bf->position_current;
614                         break;
615                     }
616                 }
617 
618                 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, buffer, to_read);
619 
620                 if(ISOK(ret))
621                 {
622                     bf->position_current += ret;
623                     bf->position_requested = bf->position_current;
624                     buffer += ret;
625                     size -= ret;
626 
627                     if((size == 0) && (ret == to_read))
628                     {
629                         ssize_t total = buffer - buffer_org;
630 
631                         return total;
632                     }
633 
634                     page_position = next_page_position;
635                     in_page_from = 0;
636 
637                     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
638 
639                     continue;
640                 }
641                 else
642                 {
643                     // handle the error so read bytes are not ignored
644 
645                     ssize_t total = buffer - buffer_org;
646                     if(total > 0)
647                     {
648                         ret = total;
649                     }
650 
651                     return ret;
652                 }
653             }
654 
655             s64 cost_computation_begin = timeus();
656 
657             s64 end_avail = bf->size - page_position;
658 
659             if(end_avail > 0)
660             {
661                 if(bf->position_current != page_position)
662                 {
663                     // move into the file at the requested position
664 
665                     ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, page_position, SEEK_SET);
666 
667                     if(FAIL(ret))
668                     {
669                         buffered_file_cache_release_page(bf->page_cache, page);
670 
671                         ssize_t total = buffer - buffer_org;
672 
673                         if(total > 0)
674                         {
675                             ret = total;
676                         }
677 
678                         return ret;
679                     }
680 
681                     bf->position_current = ret;
682                 }
683 
684                 // fill the page
685 
686                 size_t expected_size = MIN(end_avail, page->size);
687 
688                 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, page->buffer, expected_size);
689 
690                 if(FAIL(ret))
691                 {
692                     buffered_file_cache_release_page(bf->page_cache, page);
693 
694                     ssize_t total = buffer - buffer_org;
695 
696                     if(total > 0)
697                     {
698                         ret = total;
699                     }
700 
701                     return ret;
702                 }
703 
704                 bf->position_current += ret;
705                 page->read_to = ret;
706             }
707             else
708             {
709                 page->read_to = 0;
710             }
711 
712             s64 cost_computation_end = timeus();
713 
714             // update the cost
715 
716             page->cost = (cost_computation_end - cost_computation_begin) * 2;
717 
718             group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
719 
720             page->timestamp = MAX_S64;
721             u64_node *page_node = u64_set_insert(&bf->page_set.offset_to_page, page->position);
722             page_node->value = page;
723         }
724         else
725         {
726             page = (buffered_file_page_t)node->value;
727         }
728 
729         // copy from the page
730 
731         ssize_t available_in_page = page->read_to - in_page_from;
732 
733         ssize_t n = MIN(available_in_page, size);
734         memcpy(buffer, &page->buffer[in_page_from], n);
735 
736         bf->position_requested += n;
737         page->timestamp = timeus();
738 
739         size -= n;
740 
741         // move the page at the top of the MRU
742 
743         buffered_file_cache_set_page_as_most_recently_used(bf->page_cache, page);
744 
745         buffer += n;
746 
747         // if size == 0 the job is done
748         // if page->read_to != page->size this is the end of the file
749 
750         if((size == 0) || (page->read_to != page->size))
751         {
752             group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
753             break;
754         }
755 
756         // at this point, bf->position_requested & bf->page_cache->granularity_mask should be 0
757 
758         assert((bf->position_requested & bf->page_cache->granularity_mask) == 0);
759 
760         page_position += page->size;
761         in_page_from = 0;
762         page = NULL;
763     }
764 
765     ssize_t total = buffer - buffer_org;
766 
767     return total;
768 }
769 
770 static ssize_t
buffered_file_write(file_t f,const void * buffer_,ssize_t size)771 buffered_file_write(file_t f, const void *buffer_, ssize_t size)
772 {
773     buffered_file_t bf = (buffered_file_t)f;
774 
775     if(bf->buffered_file == NULL)
776     {
777         return INVALID_STATE_ERROR;
778     }
779 
780     // see if wanted position (which is where we will be reading from) page is in the cache
781     // if not, move to position, acquire the page and update its content with the file (if the position is outside the file, just fill with zeroes)
782     //   if page acquisition is not possible, do a direct write
783     // from the buffer, copy the bytes to the page and update the written range
784 
785     const u8 *buffer = (const u8*)buffer_;
786     const u8* buffer_org = buffer;
787 
788     buffered_file_page_t page = NULL;
789 
790     s64 page_position = bf->position_requested & ~bf->page_cache->granularity_mask;
791     s64 in_page_from = bf->position_requested & bf->page_cache->granularity_mask;
792 
793     // we know the page_position and the position in the page
794 
795     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
796 
797     for(;;) // until there are no more bytes to read
798     {
799         // see if the page is cached
800 
801         u64_node *node = u64_set_find(&bf->page_set.offset_to_page, page_position);
802 
803         if(node == NULL)
804         {
805             // the page is not cached
806 
807             group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
808 
809             // acquire a page to use for caching
810 
811             page = buffered_file_cache_acquire_page(bf->page_cache, bf, page_position);
812 
813             if(page == NULL)
814             {
815                 // case where the cache is over used so taking a page would be counter-productive by making the cache a glorified intermediary buffer
816 
817                 if(bf->position_current != bf->position_requested)
818                 {
819                     // move into the file at the requested position
820 
821                     ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
822 
823                     if(FAIL(ret))
824                     {
825                         buffered_file_cache_release_page(bf->page_cache, page);
826 
827                         if(bf->position_requested > bf->size)
828                         {
829                             bf->size = bf->position_requested;
830                         }
831 
832                         ssize_t total = buffer - buffer_org;
833 
834                         if(total > 0)
835                         {
836                             ret = total;
837                         }
838 
839                         return ret;
840                     }
841 
842                     bf->position_current = ret;
843                 }
844 
845                 // write until the next cached page
846 
847                 s64 to_write = size;
848                 s64 next_page_position = page_position;
849 
850                 for(;;)
851                 {
852                     next_page_position += bf->page_cache->granularity_mask + 1;
853 
854                     if(next_page_position >= bf->position_current + size)
855                     {
856                         break;
857                     }
858 
859                     if(u64_set_find(&bf->page_set.offset_to_page, next_page_position) != NULL)
860                     {
861                         // a page exists at that position
862                         to_write = next_page_position - bf->position_current;
863                         break;
864                     }
865                 }
866 
867                 ssize_t ret = bf->buffered_file->vtbl->write(bf->buffered_file, buffer, to_write);
868 
869                 if(ISOK(ret))
870                 {
871                     bf->position_current += ret;
872                     bf->position_requested = bf->position_current;
873                     buffer += ret;
874                     size -= ret;
875 
876                     if(bf->position_requested > bf->size)
877                     {
878                         bf->size = bf->position_requested;
879                     }
880 
881                     if((size == 0) && (ret == to_write))
882                     {
883                         ssize_t total = buffer - buffer_org;
884 
885                         return total;
886                     }
887 
888                     page_position = next_page_position;
889                     in_page_from = 0;
890 
891                     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
892 
893                     continue;
894                 }
895                 else
896                 {
897                     // handle the error so read bytes are not ignored
898 
899                     if(bf->position_current > bf->size)
900                     {
901                         bf->size = bf->position_current;
902                     }
903 
904                     ssize_t total = buffer - buffer_org;
905                     if(total > 0)
906                     {
907                         ret = total;
908                     }
909 
910                     return ret;
911                 }
912             }
913 
914             s64 cost_computation_begin = timeus();
915 
916             s64 end_avail = bf->size - page_position;
917 
918             if(end_avail > 0)
919             {
920                 if(bf->position_current != page_position)
921                 {
922                     // move into the file at the requested position
923 
924                     ssize_t ret = bf->buffered_file->vtbl->seek(bf->buffered_file, page_position, SEEK_SET);
925 
926                     if(FAIL(ret))
927                     {
928                         if(bf->position_current > bf->size)
929                         {
930                             bf->size = bf->position_current;
931                         }
932 
933                         buffered_file_cache_release_page(bf->page_cache, page);
934 
935                         ssize_t total = buffer - buffer_org;
936 
937                         if(total > 0)
938                         {
939                             ret = total;
940                         }
941 
942                         return ret;
943                     }
944 
945                     bf->position_current = ret;
946                 }
947 
948                 // fill the page
949 
950                 size_t expected_size = MIN(end_avail, page->size);
951 
952                 ssize_t ret = bf->buffered_file->vtbl->read(bf->buffered_file, page->buffer, expected_size);
953 
954                 if(FAIL(ret))
955                 {
956                     if(bf->position_current > bf->size)
957                     {
958                         bf->size = bf->position_current;
959                     }
960 
961                     buffered_file_cache_release_page(bf->page_cache, page);
962 
963                     ssize_t total = buffer - buffer_org;
964 
965                     if(total > 0)
966                     {
967                         ret = total;
968                     }
969 
970                     return ret;
971                 }
972 
973                 page->read_to = ret;
974             }
975             else
976             {
977                 page->read_to = 0;
978             }
979 
980             s64 cost_computation_end = timeus();
981 
982             // update the cost
983 
984             page->cost = (cost_computation_end - cost_computation_begin) * 2;
985 
986             group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
987 
988             page->timestamp = MAX_S64;
989             u64_node *page_node = u64_set_insert(&bf->page_set.offset_to_page, page->position);
990             page_node->value = page;
991         }
992         else
993         {
994             page = (buffered_file_page_t)node->value;
995         }
996 
997         // copy from the page
998 
999         ssize_t available_in_page = page->size - in_page_from; // it's size and not read_to
1000 
1001         ssize_t n = MIN(available_in_page, size);
1002         memcpy(&page->buffer[in_page_from], buffer, n);
1003 
1004         if(page->written_from > in_page_from)
1005         {
1006             page->written_from = in_page_from;
1007         }
1008 
1009         if(page->written_to_plus_one < in_page_from + n)
1010         {
1011             page->written_to_plus_one = in_page_from + n;
1012         }
1013 
1014         if(page->read_to < in_page_from + n)
1015         {
1016             page->read_to = in_page_from + n;
1017         }
1018 
1019         bf->position_requested += n;
1020         page->timestamp = timeus();
1021 
1022         size -= n;
1023 
1024         // move the page at the top of the MRU
1025 
1026         buffered_file_cache_set_page_as_most_recently_used(bf->page_cache, page);
1027 
1028         buffer += n;
1029 
1030         // if size == 0 the job is done
1031         // if page->read_to != page->size this is the end of the file
1032 
1033         if((size == 0) || (page->read_to != page->size))
1034         {
1035             if(bf->position_requested > bf->size)
1036             {
1037                 bf->size = bf->position_requested;
1038             }
1039 
1040             group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1041             break;
1042         }
1043 
1044         // at this point, bf->position_requested & bf->page_cache->granularity_mask should be 0
1045 
1046         assert((bf->position_requested & bf->page_cache->granularity_mask) == 0);
1047 
1048         page_position += page->size;
1049         in_page_from = 0;
1050         page = NULL;
1051     }
1052 
1053     ssize_t total = buffer - buffer_org;
1054 
1055     return total;
1056 }
1057 
1058 static ssize_t
buffered_file_seek(file_t f,ssize_t position,int whence)1059 buffered_file_seek(file_t f, ssize_t position, int whence)
1060 {
1061     buffered_file_t bf = (buffered_file_t)f;
1062 
1063     if(bf->buffered_file == NULL)
1064     {
1065         return INVALID_STATE_ERROR;
1066     }
1067 
1068     switch(whence)
1069     {
1070         case SEEK_SET:
1071         {
1072             bf->position_requested = position;
1073             return position;
1074         }
1075         case SEEK_CUR:
1076         {
1077             if(bf->position_requested + position >= 0)
1078             {
1079                 bf->position_requested += position;
1080                 return bf->position_requested;
1081             }
1082             else
1083             {
1084                 bf->position_requested = 0;
1085                 return 0;
1086             }
1087         }
1088         case SEEK_END:
1089         {
1090             if(bf->size + position >= 0)
1091             {
1092                 bf->position_requested = bf->size + position;
1093                 return bf->position_requested;
1094             }
1095             else
1096             {
1097                 bf->position_requested = 0;
1098                 return 0;
1099             }
1100         }
1101         default:
1102         {
1103             return INVALID_ARGUMENT_ERROR;
1104         }
1105     }
1106 }
1107 
1108 static ssize_t
buffered_file_tell(file_t f)1109 buffered_file_tell(file_t f)
1110 {
1111     buffered_file_t bf = (buffered_file_t)f;
1112 
1113     if(bf->buffered_file == NULL)
1114     {
1115         return INVALID_STATE_ERROR;
1116     }
1117 
1118     return bf->position_requested;
1119 }
1120 
1121 static ya_result
buffered_file_flush(file_t f)1122 buffered_file_flush(file_t f)
1123 {
1124     buffered_file_t bf = (buffered_file_t)f;
1125 
1126     if(bf->buffered_file == NULL)
1127     {
1128         return INVALID_STATE_ERROR;
1129     }
1130 
1131     ssize_t ret = 0;
1132     bool moved = FALSE;
1133 
1134     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1135 
1136     u64_set_iterator iter;
1137     u64_set_iterator_init(&bf->page_set.offset_to_page, &iter);
1138     while(u64_set_iterator_hasnext(&iter))
1139     {
1140         u64_node *node = u64_set_iterator_next_node(&iter);
1141         buffered_file_page* page = (buffered_file_page*)node->value;
1142         if(page->written_from <= page->written_to_plus_one)
1143         {
1144             // move at the position
1145             // write the bytes
1146 
1147             ssize_t target = page->position + page->written_from;
1148             ret = bf->buffered_file->vtbl->seek(bf->buffered_file, target, SEEK_SET);
1149             if(ret != target)
1150             {
1151                 if(ret >= 0)    // the returned value does not match the expected position
1152                 {
1153                     ret = INVALID_STATE_ERROR;
1154                 }
1155 
1156                 break;
1157             }
1158 
1159             size_t size = (size_t)page->written_to_plus_one - page->written_from;
1160 
1161             ret =  bf->buffered_file->vtbl->write(bf->buffered_file, &page->buffer[page->written_from], size);
1162 
1163             if(ret != (ssize_t)size)
1164             {
1165                 if(ret >= 0)    // the number of bytes do not match the expected written amount
1166                 {
1167                     ret = INVALID_STATE_ERROR;
1168                 }
1169 
1170                 break;
1171             }
1172 
1173             page->written_from = size;
1174             page->written_to_plus_one = 0;
1175 
1176             moved = TRUE;
1177         }
1178     }
1179 
1180     group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1181 
1182     if(moved)
1183     {
1184         bf->position_current = bf->buffered_file->vtbl->seek(bf->buffered_file, bf->position_requested, SEEK_SET);
1185     }
1186 
1187     return ret;
1188 }
1189 
1190 static int
buffered_file_close(file_t f)1191 buffered_file_close(file_t f)
1192 {
1193     buffered_file_t bf = (buffered_file_t)f;
1194 
1195     if(bf->buffered_file == NULL)
1196     {
1197         return INVALID_STATE_ERROR;
1198     }
1199 
1200     buffered_file_flush(f);
1201 
1202     group_mutex_lock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1203 
1204     u64_set_iterator iter;
1205     u64_set_iterator_init(&bf->page_set.offset_to_page, &iter);
1206     while(u64_set_iterator_hasnext(&iter))
1207     {
1208         u64_node *node = u64_set_iterator_next_node(&iter);
1209         buffered_file_page* page = (buffered_file_page*)node->value;
1210 
1211         buffered_file_cache_release_page(bf->page_cache, page);
1212     }
1213     u64_set_destroy(&bf->page_set.offset_to_page);
1214 
1215     group_mutex_unlock(&bf->page_set.mtx, GROUP_MUTEX_WRITE);
1216 
1217     bf->buffered_file->vtbl->close(bf->buffered_file);
1218 
1219     buffered_file_cache_release(bf->page_cache);
1220 
1221     bf->page_cache = NULL;
1222     bf->vtbl = NULL;
1223     ZFREE_OBJECT(bf);
1224     return SUCCESS;
1225 }
1226 
1227 static ssize_t
buffered_file_size(file_t f)1228 buffered_file_size(file_t f)
1229 {
1230     buffered_file_t bf = (buffered_file_t)f;
1231 
1232     if(bf->buffered_file == NULL)
1233     {
1234         return INVALID_STATE_ERROR;
1235     }
1236 
1237     return bf->size;
1238 }
1239 
1240 static int
buffered_file_resize(file_t f,ssize_t size)1241 buffered_file_resize(file_t f, ssize_t size)
1242 {
1243     buffered_file_t bf = (buffered_file_t)f;
1244 
1245     if(bf->buffered_file == NULL)
1246     {
1247         return INVALID_STATE_ERROR;
1248     }
1249 
1250     if(size < 0)
1251     {
1252         return INVALID_ARGUMENT_ERROR;
1253     }
1254 
1255     int ret = file_resize(bf->buffered_file, size);
1256 
1257     if(ret >= 0)
1258     {
1259         bf->size = size;
1260     }
1261 
1262     return ret;
1263 }
1264 
1265 static const struct file_vtbl buffered_file_vtbl =
1266 {
1267     buffered_file_read,
1268     buffered_file_write,
1269     buffered_file_seek,
1270     buffered_file_tell,
1271     buffered_file_flush,
1272     buffered_file_close,
1273     buffered_file_size,
1274     buffered_file_resize
1275 };
1276 
1277 ya_result
buffered_file_init(file_t * fp,file_t file_to_buffer_,buffered_file_cache_t fc)1278 buffered_file_init(file_t *fp, file_t file_to_buffer_, buffered_file_cache_t fc)
1279 {
1280     if(fp == NULL || file_to_buffer_ == NULL || fc == NULL)
1281     {
1282         return UNEXPECTED_NULL_ARGUMENT_ERROR;
1283     }
1284 
1285     file_t file_to_buffer = file_to_buffer_;
1286 
1287     buffered_file_t bf;
1288     ZALLOC_OBJECT_OR_DIE(bf,struct buffered_file_t_, BUFFERED_FILE_TAG);
1289     bf->vtbl = &buffered_file_vtbl;
1290     bf->buffered_file = file_to_buffer;
1291     buffered_file_cache_acquire(fc);
1292     bf->page_cache = fc;
1293     group_mutex_init(&bf->page_set.mtx);
1294     bf->page_set.offset_to_page = NULL;
1295     bf->position_current = file_to_buffer->vtbl->tell(file_to_buffer);
1296     bf->position_requested = bf->position_current;
1297     bf->size = file_to_buffer->vtbl->size(file_to_buffer);
1298 
1299     *fp = (file_t)bf;
1300 
1301     return SUCCESS;
1302 }
1303 
1304 /** @} */
1305